From f09756d591134996892377dabf4e43eaa2867859 Mon Sep 17 00:00:00 2001 From: Phani Pavan K Date: Thu, 13 Mar 2025 14:15:36 +0530 Subject: [PATCH] rewrote client-router mcall arch, added funcdocs, removed requests dep --- .gitignore | 1 + harmoney/client.py | 59 ++++++++++++++++++++------------------- harmoney/router.py | 69 ++++++++++++++++++++++++++++++++++++++++------ harmoney/runner.py | 8 +++++- pyproject.toml | 8 +++--- requirements.txt | 1 - 6 files changed, 103 insertions(+), 43 deletions(-) diff --git a/.gitignore b/.gitignore index 0dbf2f2..4c4e080 100644 --- a/.gitignore +++ b/.gitignore @@ -168,3 +168,4 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +.testCache diff --git a/harmoney/client.py b/harmoney/client.py index d65fc5e..bd1541c 100644 --- a/harmoney/client.py +++ b/harmoney/client.py @@ -1,43 +1,44 @@ -import time -import requests as req +import asyncio from ._callSpec import _CallPacket import pickle as pkl -import base64 -from threading import Thread +from websockets.asyncio import client as WSC __all__ = ["Client"] class Client: def __init__(self, host, port) -> None: - self._url = f"http://{host}:{port}/cliReq" + self._wsURL = f"ws://{host}:{port}/cliReq/" self.tasks = [] def singleCall(self, function, **kwargs): - callPacket = _CallPacket(procedure=function, data=kwargs) - payload = {"data": base64.b64encode(pkl.dumps(callPacket)).decode("utf-8")} - resp = req.post(self._url, json=payload) - return pkl.loads(base64.b64decode(resp.text)) + """ + Performs single call with the provided function and args + """ + self.addCall(_CallPacket(procedure=function, data=kwargs)) + return self.runAllCalls()[0] def addCall(self, function, **kwargs): - self.tasks.append((function, kwargs)) - print(f"Total in Queue: {len(self.tasks)}") + """ + Adds a task to call queue. + """ + self.tasks.append((_CallPacket(procedure=function, data=kwargs))) + + async def _runAllCalls(self, callDelay=0.01): + """ + Logic function to communicate with the router. + """ + print(f"Total Calls: {len(self.tasks)}") + async with WSC.connect(self._wsURL+f"{len(self.tasks)}", open_timeout=None, ping_interval=10, ping_timeout=None) as ws: + ackCount = int(await ws.recv()) + assert ackCount == len(self.tasks), "Comms not proper..." + await ws.send(pkl.dumps(self.tasks)) + returnData = await ws.recv() + returnData = pkl.loads(returnData) + self.tasks=[] + return returnData def runAllCalls(self, callDelay=0.01): - if len(self.tasks) == 0: - return [] - self.returnValues = [0]*len(self.tasks) - self.done = [0] * len(self.tasks) - for callIDX in range(len(self.tasks)): - t = Thread(target=self._threadWorker, args=[callIDX, self.tasks[callIDX]]) - t.start() - time.sleep(callDelay) - while not all(self.done): - time.sleep(1) - self.tasks = [] - return self.returnValues - - def _threadWorker(self, callIDX, payload): - # print(callIDX, payload) - ret = self.singleCall(function=payload[0], **payload[1]) - self.returnValues[callIDX] = ret - self.done[callIDX] =1 + """ + User facing function to remotely execute queued tasks. + """ + return asyncio.run(self._runAllCalls(callDelay=callDelay)) diff --git a/harmoney/router.py b/harmoney/router.py index 03ff7ce..e435fd5 100644 --- a/harmoney/router.py +++ b/harmoney/router.py @@ -1,11 +1,13 @@ import asyncio import base64 +from threading import Thread +from typing import List from uvicorn import Config, Server import fastapi from uvicorn.config import LOG_LEVELS import pickle as pkl import uuid -from ._callSpec import _ClientPacket +from ._callSpec import _ClientPacket, _CallPacket __all__ = ["startRouter"] @@ -13,42 +15,93 @@ class _Router: def __init__(self, pollingDelay=0.5) -> None: self.router = fastapi.APIRouter() self.router.add_api_websocket_route("/reg", self.registerRunner) - self.router.add_api_route("/cliReq", self.clientRequest, methods=["POST"]) + self.router.add_api_websocket_route("/cliReq/{count}", self.multiClientRequest) self.taskQueue = asyncio.Queue() self.runnerCount=0 self.returnDict = {} + self.doneDict = {} self.pollingDelay = pollingDelay - async def registerRunner(self, wsConnection: fastapi.WebSocket): + """ + Method which queries an available task and sends the data to the attached runner. + """ await wsConnection.accept() await wsConnection.send_text(str(self.runnerCount)) methods=await wsConnection.receive() methods = pkl.loads(base64.b64decode(methods["text"])) print(f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}") + runnerID=self.runnerCount self.runnerCount+=1 + runnerCounter = 0 while True: reqID, data = await self.taskQueue.get() + runnerCounter+=1 + print(f"Runr {runnerID} Counter: {runnerCounter}") await wsConnection.send_bytes(pkl.dumps(data)) retValue = await wsConnection.receive() - self.returnDict[reqID] = retValue["bytes"] - print(f"Tasks left: {self.taskQueue.qsize()}") + self.returnDict[reqID] = pkl.loads(base64.b64decode(retValue["bytes"])) async def clientRequest(self, data:_ClientPacket): + """ + Method to handle single request, adds the task to queue and awaits for result. + To be deprecated for better task handling. + """ reqID = uuid.uuid4().hex - callPacket = pkl.loads(base64.b64decode(data.data)) + callPacket = data await self.taskQueue.put((reqID, callPacket)) while reqID not in self.returnDict: await asyncio.sleep(self.pollingDelay) - # await asyncio.sleep(1) returnValue = self.returnDict[reqID] + self.returnDict.pop(reqID) return returnValue + async def multiClientRequest(self, wsConn:fastapi.WebSocket, count:int): + """ + Method accepts a task list and adds them to the queue. + Returns the results to client. + """ + await wsConn.accept() + softLimit=50 + await wsConn.send_text(str(count)) + reqID = uuid.uuid4().hex + self.returnDict[reqID] = [0]*count + self.doneDict[reqID] = [0]*count + print(f"Received {count} tasks") + taskBytes = await wsConn.receive_bytes() + taskPackets = pkl.loads(taskBytes) + softLimitItr = 0 + for task in range(len(taskPackets)): + while (task > (softLimitItr+softLimit)) and not self.doneDict[reqID][softLimitItr]==1: + await asyncio.sleep(1) + if self.doneDict[reqID][softLimitItr]==1: + softLimitItr+=1 + t=Thread(target=self._worker, args=(reqID, task, taskPackets[task])) + t.daemon=True + t.start() + while not all(self.doneDict[reqID]): + await asyncio.sleep(1) + await wsConn.send_bytes(pkl.dumps(self.returnDict[reqID])) + self.returnDict.pop(reqID) + + def _worker(self, id, idx, data:_ClientPacket): + """ + Thread worker to handle one task. + To be depricated for better task handling. + """ + retVal = asyncio.run(self.clientRequest(data)) + self.returnDict[id][idx]=retVal + self.doneDict[id][idx]=1 + return + def startRouter(host, port, pollingDelay=0.1, logLevel=3): + """ + Main function to start the router system. + """ br = _Router(pollingDelay=pollingDelay) app = fastapi.FastAPI() app.include_router(br.router) level = list(LOG_LEVELS.keys())[logLevel] - serverConf = Config(app = app, host=host, port=port, log_level=LOG_LEVELS[level], ws_ping_interval=10, ws_ping_timeout=None) + serverConf = Config(app = app, host=host, port=port, log_level=LOG_LEVELS[level], ws_ping_interval=10, ws_ping_timeout=None, ws_max_size=1024*1024*1024) server = Server(config=serverConf) server.run() diff --git a/harmoney/runner.py b/harmoney/runner.py index 2de9b77..b28f8c2 100644 --- a/harmoney/runner.py +++ b/harmoney/runner.py @@ -1,6 +1,5 @@ import base64 from typing import Any, Dict -# from fastapi import WebSocketException from websockets.asyncio import client as WSC from websockets.exceptions import WebSocketException import asyncio @@ -10,6 +9,10 @@ from ._callSpec import _CallPacket __all__ = ["startRunner"] async def _send(funcMap: Dict[str, Any], url): + """ + Main logic funcion, connects to the router, takes the incoming task, executes and returns the result. + To improve error handling from the mapped function side. + """ counter=0 async with WSC.connect(url, open_timeout=None, ping_interval=10, ping_timeout=None ) as w: try: @@ -29,4 +32,7 @@ async def _send(funcMap: Dict[str, Any], url): await w.close() def startRunner(funcMapping, host, port): + """ + Main function to call from the user code. + """ asyncio.run(_send(funcMapping, f"ws://{host}:{port}/reg")) diff --git a/pyproject.toml b/pyproject.toml index 1603c23..a0e7093 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,13 +4,13 @@ build-backend = "setuptools.build_meta" [project] name = "harmoney" -version = "0.2.0" -description = "Simple Remote Function Calling Framework" +version = "0.3.0" +description = "Scalable Remote Function Calling Framework" authors = [{name = "Phani Pavan K", email = "kphanipavan@gmail.com"}] license = {text = "AGPLv3"} readme = "README.md" requires-python = ">=3.8" -dependencies = ["websockets>=15.0", "uvicorn>=0.34.0", "fastapi>=0.115.8", "pydantic>=2.10.6", "requests>=2.31.0"] +dependencies = ["websockets>=15.0", "uvicorn>=0.34.0", "fastapi>=0.115.8", "pydantic>=2.10.6"] [project.urls] -Homepage = "https://git.pvnweb.dedyn.io/phanipavank/harmoney" +Homepage = "https://github.com/kphanipavan/harmoney" diff --git a/requirements.txt b/requirements.txt index f185d43..1ebc74d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ fastapi pydantic websockets -requests