Compare commits

5 Commits

Author SHA1 Message Date
0f6cc3aeaf fixes #8 2026-02-22 06:54:06 +00:00
2d601d5ce1 probably fixed #5, need more testing 2026-02-22 12:09:53 +05:30
fc2e4179c4 add runner crash recovery 2026-02-21 18:42:08 +05:30
Phani Pavan K
f2cc63828a restructured, fixed #7, #1, #9 2026-02-19 18:21:26 +05:30
0b8006467f added tasks 2025-11-03 17:01:03 +05:30
10 changed files with 276 additions and 169 deletions

View File

@@ -9,54 +9,74 @@ Distributed Function Caller Framework for Python
Dependencies: Dependencies:
- websockets - websockets
- fastapi - fastapi
- requests
- uvicorn - uvicorn
- pydantic - pydantic
## Usage: ## Usage:
Requires 3 scripts: Client, Broker and Runner Requires 3 scripts: Client, Router and Runner
- Broker will mediate load balancing and connection handling, so this should start first. One port should be open. ### 1. Router
Let broker's IP be `192.168.0.110` and port be `7732` The Router will mediate load balancing and connection handling between the function runners and callers, so this should start first. One port must to facilitate the communication.
This example starts the router on IP `192.168.0.110` and port `7732`.
```python ```python
from harmoney import router as rou from harmoney.router import startRouter
ro.startRouter("0.0.0.0", 7732) startRouter("0.0.0.0", 7732)
``` ```
- Runner performs the calculations, should contain function definitions. Connects to broker using broker's IP. ### 2. Runner
Runner exposes the function calls and contains all the heavy compute logic. Below code connects one instance of runner to the router.
```python ```python
from harmoney import runners as run from harmoney.runner import startRunner
def customFunction(arg1: int, arg2: str) -> str: def customFunction(arg1: int, arg2: str) -> str:
return arg2*arg1 return arg2*arg1
funcs = {"custFn": customFunction} funcs = {"custFn": customFunction}
run.startRunner(funcs, "192.168.0.110", 7732) startRunner(funcs, "192.168.0.110", 7732)
``` ```
- Client is the main caller of functions. Will contain your main code. Functions must return objects that are pickleable. `None` or no return statement is valid.
### 3. Client
Client requests the function - argument combinations to run on runners.
```python ```python
from harmoney import client as cli from harmoney.client import Client
cli.Client("192.168.0.110", 7732) cli = Client("192.168.0.110", 7732)
retVal = cli.runSingle("custFn", arg1=10, arg2="arst") retVal = cli.runSingle("custFn", arg1=10, arg2="arst") # run only this combination
print(retVal) print(retVal)
cli.addCall("custFn", arg1=123, arg2="qwf") # add this combination to queue
cli.addCall("custFn", arg1=321, arg2="ars") # add this combination to queue
res = cli.runAllCalls() # send queued combs at a time.
print(res) # Print all the results
``` ```
Return values from `runALlCalls` are in the order of their function combination in the queue.
TODO: TODO:
- [ ] Error catching, keeping the connection to the broker - [ ] Error catching, keeping the connection to the broker
- [ ] Error info should return to the client - [ ] Error info should return to the client
- [ ] Remove dependency on fastapi and requests, move to completely to websockets - [ ] Remove dependency on fastapi and requests, move to completely to websockets
---
~ A Grammer Society Project.

4
harmoney.md Normal file
View File

@@ -0,0 +1,4 @@
- run multiple processes for given definition automatically
- add debug flags for print statements
- add string method for call packet, for pretty printing
- remove disconnected servers from router

View File

@@ -0,0 +1,3 @@
from .client import Client
__all__ = ["Client"]

View File

@@ -1,44 +1,60 @@
import asyncio import asyncio
from ._callSpec import _CallPacket
import pickle as pkl import pickle as pkl
from websockets.asyncio import client as WSC from websockets.asyncio import client as WSC
from .._callSpec import _CallPacket
__all__ = ["Client"] __all__ = ["Client"]
class Client: class Client:
def __init__(self, host, port) -> None: def __init__(self, host: str, port: str | int) -> None:
self._wsURL = f"ws://{host}:{port}/cliReq/" self._wsURL = f"ws://{host}:{port}/cliReq/"
self.tasks = [] self.tasks = []
def singleCall(self, function, **kwargs): def singleCall(self, function: str, **kwargs):
""" """
Performs single call with the provided function and args Performs single call with the provided function and args
""" """
self.addCall(_CallPacket(procedure=function, data=kwargs)) self.addCall(function, **kwargs)
return self.runAllCalls()[0] return self.runAllCalls()[0]
def addCall(self, function, **kwargs): def addCall(self, function: str, **kwargs) -> int:
""" """
Adds a task to call queue. Adds a task to call queue.
Returns current length of call queue.
""" """
self.tasks.append((_CallPacket(procedure=function, data=kwargs))) self.tasks.append((_CallPacket(procedure=function, data=kwargs)))
return len(self.tasks)
async def _runAllCalls(self, callDelay=0.01): async def _runAllCalls(self) -> list:
""" """
Logic function to communicate with the router. Logic function to communicate with the router.
""" """
print(f"Total Calls: {len(self.tasks)}") if len(self.tasks) == 0:
async with WSC.connect(self._wsURL+f"{len(self.tasks)}", open_timeout=None, ping_interval=10, ping_timeout=None) as ws: return []
print(f"Running {len(self.tasks)} functions")
async with WSC.connect(
self._wsURL + f"{len(self.tasks)}",
open_timeout=None,
ping_interval=10,
ping_timeout=None,
) as ws:
ackCount = int(await ws.recv()) ackCount = int(await ws.recv())
assert ackCount == len(self.tasks), "Comms not proper..." assert ackCount == len(self.tasks), "Comms not proper..."
await ws.send(pkl.dumps(self.tasks)) await ws.send(pkl.dumps(self.tasks))
returnData = await ws.recv() returnData = await ws.recv()
returnData = pkl.loads(returnData) returnData = pkl.loads(returnData)
self.tasks=[] self.tasks.clear()
return returnData return returnData
def runAllCalls(self, callDelay=0.01): def runAllCalls(self):
""" """
User facing function to remotely execute queued tasks. User facing function to remotely execute queued tasks.
""" """
return asyncio.run(self._runAllCalls(callDelay=callDelay)) return asyncio.run(self._runAllCalls())
def clearQueue(self) -> bool:
self.tasks.clear()
return True

View File

@@ -1,107 +0,0 @@
import asyncio
import base64
from threading import Thread
from typing import List
from uvicorn import Config, Server
import fastapi
from uvicorn.config import LOG_LEVELS
import pickle as pkl
import uuid
from ._callSpec import _ClientPacket, _CallPacket
__all__ = ["startRouter"]
class _Router:
def __init__(self, pollingDelay=0.5) -> None:
self.router = fastapi.APIRouter()
self.router.add_api_websocket_route("/reg", self.registerRunner)
self.router.add_api_websocket_route("/cliReq/{count}", self.multiClientRequest)
self.taskQueue = asyncio.Queue()
self.runnerCount=0
self.returnDict = {}
self.doneDict = {}
self.pollingDelay = pollingDelay
async def registerRunner(self, wsConnection: fastapi.WebSocket):
"""
Method which queries an available task and sends the data to the attached runner.
"""
await wsConnection.accept()
await wsConnection.send_text(str(self.runnerCount))
methods=await wsConnection.receive()
methods = pkl.loads(base64.b64decode(methods["text"]))
print(f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}")
runnerID=self.runnerCount
self.runnerCount+=1
runnerCounter = 0
while True:
reqID, data = await self.taskQueue.get()
runnerCounter+=1
print(f"Runr {runnerID} Counter: {runnerCounter}")
await wsConnection.send_bytes(pkl.dumps(data))
retValue = await wsConnection.receive()
self.returnDict[reqID] = pkl.loads(base64.b64decode(retValue["bytes"]))
async def clientRequest(self, data:_ClientPacket):
"""
Method to handle single request, adds the task to queue and awaits for result.
To be deprecated for better task handling.
"""
reqID = uuid.uuid4().hex
callPacket = data
await self.taskQueue.put((reqID, callPacket))
while reqID not in self.returnDict:
await asyncio.sleep(self.pollingDelay)
returnValue = self.returnDict[reqID]
self.returnDict.pop(reqID)
return returnValue
async def multiClientRequest(self, wsConn:fastapi.WebSocket, count:int):
"""
Method accepts a task list and adds them to the queue.
Returns the results to client.
"""
await wsConn.accept()
softLimit=50
await wsConn.send_text(str(count))
reqID = uuid.uuid4().hex
self.returnDict[reqID] = [0]*count
self.doneDict[reqID] = [0]*count
print(f"Received {count} tasks")
taskBytes = await wsConn.receive_bytes()
taskPackets = pkl.loads(taskBytes)
softLimitItr = 0
for task in range(len(taskPackets)):
while (task > (softLimitItr+softLimit)) and not self.doneDict[reqID][softLimitItr]==1:
await asyncio.sleep(1)
if self.doneDict[reqID][softLimitItr]==1:
softLimitItr+=1
t=Thread(target=self._worker, args=(reqID, task, taskPackets[task]))
t.daemon=True
t.start()
while not all(self.doneDict[reqID]):
await asyncio.sleep(1)
await wsConn.send_bytes(pkl.dumps(self.returnDict[reqID]))
self.returnDict.pop(reqID)
def _worker(self, id, idx, data:_ClientPacket):
"""
Thread worker to handle one task.
To be depricated for better task handling.
"""
retVal = asyncio.run(self.clientRequest(data))
self.returnDict[id][idx]=retVal
self.doneDict[id][idx]=1
return
def startRouter(host, port, pollingDelay=0.1, logLevel=3):
"""
Main function to start the router system.
"""
br = _Router(pollingDelay=pollingDelay)
app = fastapi.FastAPI()
app.include_router(br.router)
level = list(LOG_LEVELS.keys())[logLevel]
serverConf = Config(app = app, host=host, port=port, log_level=LOG_LEVELS[level], ws_ping_interval=10, ws_ping_timeout=None, ws_max_size=1024*1024*1024)
server = Server(config=serverConf)
server.run()

View File

@@ -0,0 +1,3 @@
from .router import startRouter
__all__ = ["startRouter"]

144
harmoney/router/router.py Normal file
View File

@@ -0,0 +1,144 @@
import asyncio
import base64
import pickle as pkl
import uuid
from threading import Lock, Thread
import fastapi
from uvicorn import Config, Server
from uvicorn.config import LOG_LEVELS
from .._callSpec import _ClientPacket
__all__ = ["startRouter"]
class _Router:
def __init__(
self, pollingDelay: float | int = 0.5, recycleOnError: bool = False
) -> None:
self.router = fastapi.APIRouter()
self.router.add_api_websocket_route("/reg", self.registerRunner)
self.router.add_api_websocket_route("/cliReq/{count}", self.multiClientRequest)
self.taskQueue = asyncio.Queue()
self.runnerCount = 0
self.returnDict = {}
self.doneDict = {}
self.pollingDelay = pollingDelay
self.recycleOnError = recycleOnError
async def registerRunner(self, wsConnection: fastapi.WebSocket):
"""
Method which queries an available task and sends the data to the attached runner.
"""
l = Lock()
l.acquire()
await wsConnection.accept()
await wsConnection.send_text(str(self.runnerCount))
methods = await wsConnection.receive()
methods = pkl.loads(base64.b64decode(methods["text"]))
print(
f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}"
)
runnerID = self.runnerCount
self.runnerCount += 1
runnerCounter = 0
l.release()
try:
while True:
# add this id back into available id pools using a shared list.
reqID, data = await self.taskQueue.get()
runnerCounter += 1
print(f"Runr {runnerID} Counter: {runnerCounter}")
await wsConnection.send_bytes(pkl.dumps(data))
retValue = await wsConnection.receive()
if "bytes" not in retValue:
self.returnDict[reqID] = data
raise Exception(
f"Runner {runnerID} Crashed!! Check for function error logs on the Runner."
)
self.returnDict[reqID] = pkl.loads(base64.b64decode(retValue["bytes"]))
except Exception as e:
print(e)
if self.recycleOnError and "reqID" in locals() and "data" in locals():
await self.taskQueue.put((reqID, data))
print("Recycled this task")
# await wsConnection.close()
print(f"Runner {runnerID} Closed")
async def clientRequest(self, data: _ClientPacket):
"""
Method to handle single request, adds the task to queue and awaits for result.
To be deprecated for better task handling.
"""
reqID = uuid.uuid4().hex
callPacket = data
await self.taskQueue.put((reqID, callPacket))
while reqID not in self.returnDict:
await asyncio.sleep(self.pollingDelay)
returnValue = self.returnDict.pop(reqID)
return returnValue
async def multiClientRequest(self, wsConn: fastapi.WebSocket, count: int):
"""
Method accepts a task list and adds them to the queue.
Returns the results to client.
"""
await wsConn.accept()
softLimit = 50
await wsConn.send_text(str(count))
reqID = uuid.uuid4().hex
self.returnDict[reqID] = [0] * count
self.doneDict[reqID] = [0] * count
# print(f"Received {count} tasks")
taskBytes = await wsConn.receive_bytes()
taskPackets = pkl.loads(taskBytes)
softLimitItr = 0
for task in range(len(taskPackets)):
while (task > (softLimitItr + softLimit)) and not self.doneDict[reqID][
softLimitItr
] == 1:
await asyncio.sleep(1)
if self.doneDict[reqID][softLimitItr] == 1:
softLimitItr += 1
t = Thread(target=self._worker, args=(reqID, task, taskPackets[task]))
t.daemon = True
t.start()
while not all(self.doneDict[reqID]):
await asyncio.sleep(1)
await wsConn.send_bytes(pkl.dumps(self.returnDict[reqID]))
self.returnDict.pop(reqID)
def _worker(self, id, idx, data: _ClientPacket):
"""
Thread worker to handle one task.
To be depricated for better task handling.
"""
retVal = asyncio.run(self.clientRequest(data))
self.returnDict[id][idx] = retVal
self.doneDict[id][idx] = 1
return
def startRouter(
host: str, port: str | int, pollingDelay: float | int = 0.1, logLevel: int = 3
):
"""
Main function to start the router system.
"""
br = _Router(pollingDelay=pollingDelay)
app = fastapi.FastAPI()
app.include_router(br.router)
level = list(LOG_LEVELS.keys())[logLevel]
serverConf = Config(
app=app,
host=host,
port=int(port),
log_level=LOG_LEVELS[level],
ws_ping_interval=10,
ws_ping_timeout=None,
ws_max_size=1024 * 1024 * 1024,
)
server = Server(config=serverConf)
server.run()

View File

@@ -1,38 +0,0 @@
import base64
from typing import Any, Dict
from websockets.asyncio import client as WSC
from websockets.exceptions import WebSocketException
import asyncio
import pickle as pkl
from ._callSpec import _CallPacket
__all__ = ["startRunner"]
async def _send(funcMap: Dict[str, Any], url):
"""
Main logic funcion, connects to the router, takes the incoming task, executes and returns the result.
To improve error handling from the mapped function side.
"""
counter=0
async with WSC.connect(url, open_timeout=None, ping_interval=10, ping_timeout=None ) as w:
try:
id = await w.recv()
id = int(id)
print(f"Starting Runner, ID: {id}")
await w.send(base64.b64encode(pkl.dumps({"methods":list(funcMap.keys())})).decode("utf-8"))
while True:
packetBytes=await w.recv()
counter+=1
callPk:_CallPacket = pkl.loads(packetBytes)
print("-"*50 + f"\nRunning: {callPk.procedure}\nArgs: {callPk.data}\nCounter: {counter}\n" + "-"*50)
funcOutput = funcMap[callPk.procedure](**callPk.data)
await w.send(base64.b64encode(pkl.dumps(funcOutput)))
except WebSocketException as e:
print(f"Closing Conncetion with Broker, total call count: {counter}")
await w.close()
def startRunner(funcMapping, host, port):
"""
Main function to call from the user code.
"""
asyncio.run(_send(funcMapping, f"ws://{host}:{port}/reg"))

View File

@@ -0,0 +1,3 @@
from .runner import startRunner
__all__ = ["startRunner"]

59
harmoney/runner/runner.py Normal file
View File

@@ -0,0 +1,59 @@
import asyncio
import base64
import pickle as pkl
from typing import Any, Dict
from websockets.asyncio import client as WSC
from websockets.exceptions import WebSocketException
from .._callSpec import _CallPacket
__all__ = ["startRunner"]
async def _loop(funcMap: Dict[str, Any], url: str, debug=False):
"""
Main logic funcion, connects to the router, takes the incoming task, executes and returns the result.
To improve error handling from the mapped function side.
"""
counter = 0
async with WSC.connect(
url, open_timeout=None, ping_interval=10, ping_timeout=None
) as w:
try:
id = await w.recv()
id = int(id)
print(f"Starting Runner with ID: {id}")
await w.send(
base64.b64encode(pkl.dumps({"methods": list(funcMap.keys())})).decode(
"utf-8"
)
)
while True:
packetBytes = await w.recv()
counter += 1
callPk: _CallPacket = pkl.loads(packetBytes)
if debug:
print(
"-" * 50
+ f"\nRunning: {callPk.procedure}\nCounter: {counter}\nArgs: {str(callPk.data)[:30]}{'...' if len(str(callPk.data)) >= 30 else ''}\n"
+ "-" * 50
)
else:
print("-" * 30)
funcOutput = funcMap[callPk.procedure](**callPk.data)
await w.send(base64.b64encode(pkl.dumps(funcOutput)))
except WebSocketException:
print(f"Connection closed with Router, total call count: {counter}")
await w.close()
def startRunner(funcMapping: Dict, host: str, port: str | int, debug: bool = False):
"""
Main function to call from the user code.
"""
try:
port = int(str(port))
except:
raise TypeError(f"Port {port} not valid.")
asyncio.run(_loop(funcMapping, f"ws://{host}:{port}/reg", debug))