diff --git a/harmoney/client/__init__.py b/harmoney/client/__init__.py new file mode 100644 index 0000000..1d6af8d --- /dev/null +++ b/harmoney/client/__init__.py @@ -0,0 +1,3 @@ +from .client import Client + +__all__ = ["Client"] diff --git a/harmoney/client.py b/harmoney/client/client.py similarity index 51% rename from harmoney/client.py rename to harmoney/client/client.py index bd1541c..f9bc15d 100644 --- a/harmoney/client.py +++ b/harmoney/client/client.py @@ -1,44 +1,60 @@ import asyncio -from ._callSpec import _CallPacket import pickle as pkl + from websockets.asyncio import client as WSC +from .._callSpec import _CallPacket + __all__ = ["Client"] + class Client: - def __init__(self, host, port) -> None: + def __init__(self, host: str, port: str | int) -> None: self._wsURL = f"ws://{host}:{port}/cliReq/" self.tasks = [] - def singleCall(self, function, **kwargs): + def singleCall(self, function: str, **kwargs): """ Performs single call with the provided function and args """ - self.addCall(_CallPacket(procedure=function, data=kwargs)) + self.addCall(function, **kwargs) return self.runAllCalls()[0] - def addCall(self, function, **kwargs): + def addCall(self, function: str, **kwargs) -> int: """ Adds a task to call queue. + Returns current length of call queue. """ self.tasks.append((_CallPacket(procedure=function, data=kwargs))) + return len(self.tasks) - async def _runAllCalls(self, callDelay=0.01): + async def _runAllCalls(self) -> list: """ Logic function to communicate with the router. """ - print(f"Total Calls: {len(self.tasks)}") - async with WSC.connect(self._wsURL+f"{len(self.tasks)}", open_timeout=None, ping_interval=10, ping_timeout=None) as ws: + if len(self.tasks) == 0: + return [] + print(f"Running {len(self.tasks)} functions") + async with WSC.connect( + self._wsURL + f"{len(self.tasks)}", + open_timeout=None, + ping_interval=10, + ping_timeout=None, + ) as ws: ackCount = int(await ws.recv()) assert ackCount == len(self.tasks), "Comms not proper..." await ws.send(pkl.dumps(self.tasks)) returnData = await ws.recv() returnData = pkl.loads(returnData) - self.tasks=[] + self.tasks.clear() return returnData - def runAllCalls(self, callDelay=0.01): + def runAllCalls(self): """ User facing function to remotely execute queued tasks. """ - return asyncio.run(self._runAllCalls(callDelay=callDelay)) + return asyncio.run(self._runAllCalls()) + + def clearQueue(self) -> bool: + self.tasks.clear() + return True diff --git a/harmoney/router/__init__.py b/harmoney/router/__init__.py new file mode 100644 index 0000000..66e6088 --- /dev/null +++ b/harmoney/router/__init__.py @@ -0,0 +1,3 @@ +from .router import startRouter + +__all__ = ["startRouter"] diff --git a/harmoney/router.py b/harmoney/router/router.py similarity index 58% rename from harmoney/router.py rename to harmoney/router/router.py index e435fd5..964048a 100644 --- a/harmoney/router.py +++ b/harmoney/router/router.py @@ -1,48 +1,53 @@ import asyncio import base64 -from threading import Thread -from typing import List -from uvicorn import Config, Server -import fastapi -from uvicorn.config import LOG_LEVELS import pickle as pkl import uuid -from ._callSpec import _ClientPacket, _CallPacket +from threading import Thread + +import fastapi +from uvicorn import Config, Server +from uvicorn.config import LOG_LEVELS + +from .._callSpec import _ClientPacket __all__ = ["startRouter"] + class _Router: - def __init__(self, pollingDelay=0.5) -> None: + def __init__(self, pollingDelay: float | int = 0.5) -> None: self.router = fastapi.APIRouter() self.router.add_api_websocket_route("/reg", self.registerRunner) self.router.add_api_websocket_route("/cliReq/{count}", self.multiClientRequest) self.taskQueue = asyncio.Queue() - self.runnerCount=0 + self.runnerCount = 0 self.returnDict = {} self.doneDict = {} self.pollingDelay = pollingDelay - async def registerRunner(self, wsConnection: fastapi.WebSocket): + async def registerRunner(self, wsConnection: fastapi.WebSocket): """ Method which queries an available task and sends the data to the attached runner. """ await wsConnection.accept() await wsConnection.send_text(str(self.runnerCount)) - methods=await wsConnection.receive() + methods = await wsConnection.receive() methods = pkl.loads(base64.b64decode(methods["text"])) - print(f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}") - runnerID=self.runnerCount - self.runnerCount+=1 + print( + f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}" + ) + runnerID = self.runnerCount + self.runnerCount += 1 runnerCounter = 0 while True: - reqID, data = await self.taskQueue.get() - runnerCounter+=1 + # try except here, if connection cuts out, catch and add the task back into the task queue. have a var addBackOnError to decide if crashed task must be added back into the pool. add this id back into available id pools using a shared list. + reqID, data = await self.taskQueue.get() + runnerCounter += 1 print(f"Runr {runnerID} Counter: {runnerCounter}") await wsConnection.send_bytes(pkl.dumps(data)) retValue = await wsConnection.receive() self.returnDict[reqID] = pkl.loads(base64.b64decode(retValue["bytes"])) - async def clientRequest(self, data:_ClientPacket): + async def clientRequest(self, data: _ClientPacket): """ Method to handle single request, adds the task to queue and awaits for result. To be deprecated for better task handling. @@ -52,49 +57,53 @@ class _Router: await self.taskQueue.put((reqID, callPacket)) while reqID not in self.returnDict: await asyncio.sleep(self.pollingDelay) - returnValue = self.returnDict[reqID] - self.returnDict.pop(reqID) + returnValue = self.returnDict.pop(reqID) return returnValue - async def multiClientRequest(self, wsConn:fastapi.WebSocket, count:int): + async def multiClientRequest(self, wsConn: fastapi.WebSocket, count: int): """ Method accepts a task list and adds them to the queue. Returns the results to client. """ await wsConn.accept() - softLimit=50 + softLimit = 50 await wsConn.send_text(str(count)) reqID = uuid.uuid4().hex - self.returnDict[reqID] = [0]*count - self.doneDict[reqID] = [0]*count - print(f"Received {count} tasks") + self.returnDict[reqID] = [0] * count + self.doneDict[reqID] = [0] * count + # print(f"Received {count} tasks") taskBytes = await wsConn.receive_bytes() taskPackets = pkl.loads(taskBytes) softLimitItr = 0 for task in range(len(taskPackets)): - while (task > (softLimitItr+softLimit)) and not self.doneDict[reqID][softLimitItr]==1: + while (task > (softLimitItr + softLimit)) and not self.doneDict[reqID][ + softLimitItr + ] == 1: await asyncio.sleep(1) - if self.doneDict[reqID][softLimitItr]==1: - softLimitItr+=1 - t=Thread(target=self._worker, args=(reqID, task, taskPackets[task])) - t.daemon=True + if self.doneDict[reqID][softLimitItr] == 1: + softLimitItr += 1 + t = Thread(target=self._worker, args=(reqID, task, taskPackets[task])) + t.daemon = True t.start() while not all(self.doneDict[reqID]): await asyncio.sleep(1) await wsConn.send_bytes(pkl.dumps(self.returnDict[reqID])) self.returnDict.pop(reqID) - def _worker(self, id, idx, data:_ClientPacket): + def _worker(self, id, idx, data: _ClientPacket): """ Thread worker to handle one task. To be depricated for better task handling. """ retVal = asyncio.run(self.clientRequest(data)) - self.returnDict[id][idx]=retVal - self.doneDict[id][idx]=1 + self.returnDict[id][idx] = retVal + self.doneDict[id][idx] = 1 return -def startRouter(host, port, pollingDelay=0.1, logLevel=3): + +def startRouter( + host: str, port: str | int, pollingDelay: float | int = 0.1, logLevel: int = 3 +): """ Main function to start the router system. """ @@ -102,6 +111,14 @@ def startRouter(host, port, pollingDelay=0.1, logLevel=3): app = fastapi.FastAPI() app.include_router(br.router) level = list(LOG_LEVELS.keys())[logLevel] - serverConf = Config(app = app, host=host, port=port, log_level=LOG_LEVELS[level], ws_ping_interval=10, ws_ping_timeout=None, ws_max_size=1024*1024*1024) + serverConf = Config( + app=app, + host=host, + port=int(port), + log_level=LOG_LEVELS[level], + ws_ping_interval=10, + ws_ping_timeout=None, + ws_max_size=1024 * 1024 * 1024, + ) server = Server(config=serverConf) server.run() diff --git a/harmoney/runner.py b/harmoney/runner.py deleted file mode 100644 index b28f8c2..0000000 --- a/harmoney/runner.py +++ /dev/null @@ -1,38 +0,0 @@ -import base64 -from typing import Any, Dict -from websockets.asyncio import client as WSC -from websockets.exceptions import WebSocketException -import asyncio -import pickle as pkl -from ._callSpec import _CallPacket - -__all__ = ["startRunner"] - -async def _send(funcMap: Dict[str, Any], url): - """ - Main logic funcion, connects to the router, takes the incoming task, executes and returns the result. - To improve error handling from the mapped function side. - """ - counter=0 - async with WSC.connect(url, open_timeout=None, ping_interval=10, ping_timeout=None ) as w: - try: - id = await w.recv() - id = int(id) - print(f"Starting Runner, ID: {id}") - await w.send(base64.b64encode(pkl.dumps({"methods":list(funcMap.keys())})).decode("utf-8")) - while True: - packetBytes=await w.recv() - counter+=1 - callPk:_CallPacket = pkl.loads(packetBytes) - print("-"*50 + f"\nRunning: {callPk.procedure}\nArgs: {callPk.data}\nCounter: {counter}\n" + "-"*50) - funcOutput = funcMap[callPk.procedure](**callPk.data) - await w.send(base64.b64encode(pkl.dumps(funcOutput))) - except WebSocketException as e: - print(f"Closing Conncetion with Broker, total call count: {counter}") - await w.close() - -def startRunner(funcMapping, host, port): - """ - Main function to call from the user code. - """ - asyncio.run(_send(funcMapping, f"ws://{host}:{port}/reg")) diff --git a/harmoney/runner/__init__.py b/harmoney/runner/__init__.py new file mode 100644 index 0000000..8a46410 --- /dev/null +++ b/harmoney/runner/__init__.py @@ -0,0 +1,3 @@ +from .runner import startRunner + +__all__ = ["startRunner"] diff --git a/harmoney/runner/runner.py b/harmoney/runner/runner.py new file mode 100644 index 0000000..1e936f1 --- /dev/null +++ b/harmoney/runner/runner.py @@ -0,0 +1,59 @@ +import asyncio +import base64 +import pickle as pkl +from typing import Any, Dict + +from websockets.asyncio import client as WSC +from websockets.exceptions import WebSocketException + +from .._callSpec import _CallPacket + +__all__ = ["startRunner"] + + +async def _loop(funcMap: Dict[str, Any], url: str, debug=False): + """ + Main logic funcion, connects to the router, takes the incoming task, executes and returns the result. + To improve error handling from the mapped function side. + """ + counter = 0 + async with WSC.connect( + url, open_timeout=None, ping_interval=10, ping_timeout=None + ) as w: + try: + id = await w.recv() + id = int(id) + print(f"Starting Runner with ID: {id}") + await w.send( + base64.b64encode(pkl.dumps({"methods": list(funcMap.keys())})).decode( + "utf-8" + ) + ) + while True: + packetBytes = await w.recv() + counter += 1 + callPk: _CallPacket = pkl.loads(packetBytes) + if debug: + print( + "-" * 50 + + f"\nRunning: {callPk.procedure}\nCounter: {counter}\nArgs: {str(callPk.data)[:30]}{'...' if len(str(callPk.data)) >= 30 else ''}\n" + + "-" * 50 + ) + else: + print("-" * 30) + funcOutput = funcMap[callPk.procedure](**callPk.data) + await w.send(base64.b64encode(pkl.dumps(funcOutput))) + except WebSocketException: + print(f"Connection closed with Router, total call count: {counter}") + await w.close() + + +def startRunner(funcMapping: Dict, host: str, port: str | int, debug: bool = False): + """ + Main function to call from the user code. + """ + try: + port = int(str(port)) + except: + raise TypeError(f"Port {port} not valid.") + asyncio.run(_loop(funcMapping, f"ws://{host}:{port}/reg", debug))