add runner crash recovery
This commit is contained in:
@@ -14,7 +14,9 @@ __all__ = ["startRouter"]
|
|||||||
|
|
||||||
|
|
||||||
class _Router:
|
class _Router:
|
||||||
def __init__(self, pollingDelay: float | int = 0.5) -> None:
|
def __init__(
|
||||||
|
self, pollingDelay: float | int = 0.5, recycleOnError: bool = False
|
||||||
|
) -> None:
|
||||||
self.router = fastapi.APIRouter()
|
self.router = fastapi.APIRouter()
|
||||||
self.router.add_api_websocket_route("/reg", self.registerRunner)
|
self.router.add_api_websocket_route("/reg", self.registerRunner)
|
||||||
self.router.add_api_websocket_route("/cliReq/{count}", self.multiClientRequest)
|
self.router.add_api_websocket_route("/cliReq/{count}", self.multiClientRequest)
|
||||||
@@ -23,6 +25,7 @@ class _Router:
|
|||||||
self.returnDict = {}
|
self.returnDict = {}
|
||||||
self.doneDict = {}
|
self.doneDict = {}
|
||||||
self.pollingDelay = pollingDelay
|
self.pollingDelay = pollingDelay
|
||||||
|
self.recycleOnError = recycleOnError
|
||||||
|
|
||||||
async def registerRunner(self, wsConnection: fastapi.WebSocket):
|
async def registerRunner(self, wsConnection: fastapi.WebSocket):
|
||||||
"""
|
"""
|
||||||
@@ -38,14 +41,28 @@ class _Router:
|
|||||||
runnerID = self.runnerCount
|
runnerID = self.runnerCount
|
||||||
self.runnerCount += 1
|
self.runnerCount += 1
|
||||||
runnerCounter = 0
|
runnerCounter = 0
|
||||||
|
try:
|
||||||
while True:
|
while True:
|
||||||
# try except here, if connection cuts out, catch and add the task back into the task queue. have a var addBackOnError to decide if crashed task must be added back into the pool. add this id back into available id pools using a shared list.
|
# add this id back into available id pools using a shared list.
|
||||||
reqID, data = await self.taskQueue.get()
|
reqID, data = await self.taskQueue.get()
|
||||||
runnerCounter += 1
|
runnerCounter += 1
|
||||||
print(f"Runr {runnerID} Counter: {runnerCounter}")
|
print(f"Runr {runnerID} Counter: {runnerCounter}")
|
||||||
await wsConnection.send_bytes(pkl.dumps(data))
|
await wsConnection.send_bytes(pkl.dumps(data))
|
||||||
retValue = await wsConnection.receive()
|
retValue = await wsConnection.receive()
|
||||||
|
if "bytes" not in retValue:
|
||||||
|
self.returnDict[reqID] = data
|
||||||
|
raise Exception(
|
||||||
|
f"Runner {runnerID} Crashed!! Check for function error logs on the Runner."
|
||||||
|
)
|
||||||
|
|
||||||
self.returnDict[reqID] = pkl.loads(base64.b64decode(retValue["bytes"]))
|
self.returnDict[reqID] = pkl.loads(base64.b64decode(retValue["bytes"]))
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
if self.recycleOnError and "reqID" in locals() and "data" in locals():
|
||||||
|
await self.taskQueue.put((reqID, data))
|
||||||
|
print("Recycled this task")
|
||||||
|
# await wsConnection.close()
|
||||||
|
print(f"Runner {runnerID} Closed")
|
||||||
|
|
||||||
async def clientRequest(self, data: _ClientPacket):
|
async def clientRequest(self, data: _ClientPacket):
|
||||||
"""
|
"""
|
||||||
|
|||||||
Reference in New Issue
Block a user