diff --git a/harmoney/router/router.py b/harmoney/router/router.py index 964048a..947c2e4 100644 --- a/harmoney/router/router.py +++ b/harmoney/router/router.py @@ -14,7 +14,9 @@ __all__ = ["startRouter"] class _Router: - def __init__(self, pollingDelay: float | int = 0.5) -> None: + def __init__( + self, pollingDelay: float | int = 0.5, recycleOnError: bool = False + ) -> None: self.router = fastapi.APIRouter() self.router.add_api_websocket_route("/reg", self.registerRunner) self.router.add_api_websocket_route("/cliReq/{count}", self.multiClientRequest) @@ -23,6 +25,7 @@ class _Router: self.returnDict = {} self.doneDict = {} self.pollingDelay = pollingDelay + self.recycleOnError = recycleOnError async def registerRunner(self, wsConnection: fastapi.WebSocket): """ @@ -38,14 +41,28 @@ class _Router: runnerID = self.runnerCount self.runnerCount += 1 runnerCounter = 0 - while True: - # try except here, if connection cuts out, catch and add the task back into the task queue. have a var addBackOnError to decide if crashed task must be added back into the pool. add this id back into available id pools using a shared list. - reqID, data = await self.taskQueue.get() - runnerCounter += 1 - print(f"Runr {runnerID} Counter: {runnerCounter}") - await wsConnection.send_bytes(pkl.dumps(data)) - retValue = await wsConnection.receive() - self.returnDict[reqID] = pkl.loads(base64.b64decode(retValue["bytes"])) + try: + while True: + # add this id back into available id pools using a shared list. + reqID, data = await self.taskQueue.get() + runnerCounter += 1 + print(f"Runr {runnerID} Counter: {runnerCounter}") + await wsConnection.send_bytes(pkl.dumps(data)) + retValue = await wsConnection.receive() + if "bytes" not in retValue: + self.returnDict[reqID] = data + raise Exception( + f"Runner {runnerID} Crashed!! Check for function error logs on the Runner." + ) + + self.returnDict[reqID] = pkl.loads(base64.b64decode(retValue["bytes"])) + except Exception as e: + print(e) + if self.recycleOnError and "reqID" in locals() and "data" in locals(): + await self.taskQueue.put((reqID, data)) + print("Recycled this task") + # await wsConnection.close() + print(f"Runner {runnerID} Closed") async def clientRequest(self, data: _ClientPacket): """