restructured, fixed #7, #1, #9

This commit is contained in:
Phani Pavan K
2026-02-19 18:21:26 +05:30
parent 0b8006467f
commit f2cc63828a
7 changed files with 145 additions and 82 deletions

View File

@@ -0,0 +1,3 @@
from .client import Client
__all__ = ["Client"]

View File

@@ -1,44 +1,60 @@
import asyncio import asyncio
from ._callSpec import _CallPacket
import pickle as pkl import pickle as pkl
from websockets.asyncio import client as WSC from websockets.asyncio import client as WSC
from .._callSpec import _CallPacket
__all__ = ["Client"] __all__ = ["Client"]
class Client: class Client:
def __init__(self, host, port) -> None: def __init__(self, host: str, port: str | int) -> None:
self._wsURL = f"ws://{host}:{port}/cliReq/" self._wsURL = f"ws://{host}:{port}/cliReq/"
self.tasks = [] self.tasks = []
def singleCall(self, function, **kwargs): def singleCall(self, function: str, **kwargs):
""" """
Performs single call with the provided function and args Performs single call with the provided function and args
""" """
self.addCall(_CallPacket(procedure=function, data=kwargs)) self.addCall(function, **kwargs)
return self.runAllCalls()[0] return self.runAllCalls()[0]
def addCall(self, function, **kwargs): def addCall(self, function: str, **kwargs) -> int:
""" """
Adds a task to call queue. Adds a task to call queue.
Returns current length of call queue.
""" """
self.tasks.append((_CallPacket(procedure=function, data=kwargs))) self.tasks.append((_CallPacket(procedure=function, data=kwargs)))
return len(self.tasks)
async def _runAllCalls(self, callDelay=0.01): async def _runAllCalls(self) -> list:
""" """
Logic function to communicate with the router. Logic function to communicate with the router.
""" """
print(f"Total Calls: {len(self.tasks)}") if len(self.tasks) == 0:
async with WSC.connect(self._wsURL+f"{len(self.tasks)}", open_timeout=None, ping_interval=10, ping_timeout=None) as ws: return []
print(f"Running {len(self.tasks)} functions")
async with WSC.connect(
self._wsURL + f"{len(self.tasks)}",
open_timeout=None,
ping_interval=10,
ping_timeout=None,
) as ws:
ackCount = int(await ws.recv()) ackCount = int(await ws.recv())
assert ackCount == len(self.tasks), "Comms not proper..." assert ackCount == len(self.tasks), "Comms not proper..."
await ws.send(pkl.dumps(self.tasks)) await ws.send(pkl.dumps(self.tasks))
returnData = await ws.recv() returnData = await ws.recv()
returnData = pkl.loads(returnData) returnData = pkl.loads(returnData)
self.tasks=[] self.tasks.clear()
return returnData return returnData
def runAllCalls(self, callDelay=0.01): def runAllCalls(self):
""" """
User facing function to remotely execute queued tasks. User facing function to remotely execute queued tasks.
""" """
return asyncio.run(self._runAllCalls(callDelay=callDelay)) return asyncio.run(self._runAllCalls())
def clearQueue(self) -> bool:
self.tasks.clear()
return True

View File

@@ -0,0 +1,3 @@
from .router import startRouter
__all__ = ["startRouter"]

View File

@@ -1,48 +1,53 @@
import asyncio import asyncio
import base64 import base64
from threading import Thread
from typing import List
from uvicorn import Config, Server
import fastapi
from uvicorn.config import LOG_LEVELS
import pickle as pkl import pickle as pkl
import uuid import uuid
from ._callSpec import _ClientPacket, _CallPacket from threading import Thread
import fastapi
from uvicorn import Config, Server
from uvicorn.config import LOG_LEVELS
from .._callSpec import _ClientPacket
__all__ = ["startRouter"] __all__ = ["startRouter"]
class _Router: class _Router:
def __init__(self, pollingDelay=0.5) -> None: def __init__(self, pollingDelay: float | int = 0.5) -> None:
self.router = fastapi.APIRouter() self.router = fastapi.APIRouter()
self.router.add_api_websocket_route("/reg", self.registerRunner) self.router.add_api_websocket_route("/reg", self.registerRunner)
self.router.add_api_websocket_route("/cliReq/{count}", self.multiClientRequest) self.router.add_api_websocket_route("/cliReq/{count}", self.multiClientRequest)
self.taskQueue = asyncio.Queue() self.taskQueue = asyncio.Queue()
self.runnerCount=0 self.runnerCount = 0
self.returnDict = {} self.returnDict = {}
self.doneDict = {} self.doneDict = {}
self.pollingDelay = pollingDelay self.pollingDelay = pollingDelay
async def registerRunner(self, wsConnection: fastapi.WebSocket): async def registerRunner(self, wsConnection: fastapi.WebSocket):
""" """
Method which queries an available task and sends the data to the attached runner. Method which queries an available task and sends the data to the attached runner.
""" """
await wsConnection.accept() await wsConnection.accept()
await wsConnection.send_text(str(self.runnerCount)) await wsConnection.send_text(str(self.runnerCount))
methods=await wsConnection.receive() methods = await wsConnection.receive()
methods = pkl.loads(base64.b64decode(methods["text"])) methods = pkl.loads(base64.b64decode(methods["text"]))
print(f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}") print(
runnerID=self.runnerCount f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}"
self.runnerCount+=1 )
runnerID = self.runnerCount
self.runnerCount += 1
runnerCounter = 0 runnerCounter = 0
while True: while True:
reqID, data = await self.taskQueue.get() # try except here, if connection cuts out, catch and add the task back into the task queue. have a var addBackOnError to decide if crashed task must be added back into the pool. add this id back into available id pools using a shared list.
runnerCounter+=1 reqID, data = await self.taskQueue.get()
runnerCounter += 1
print(f"Runr {runnerID} Counter: {runnerCounter}") print(f"Runr {runnerID} Counter: {runnerCounter}")
await wsConnection.send_bytes(pkl.dumps(data)) await wsConnection.send_bytes(pkl.dumps(data))
retValue = await wsConnection.receive() retValue = await wsConnection.receive()
self.returnDict[reqID] = pkl.loads(base64.b64decode(retValue["bytes"])) self.returnDict[reqID] = pkl.loads(base64.b64decode(retValue["bytes"]))
async def clientRequest(self, data:_ClientPacket): async def clientRequest(self, data: _ClientPacket):
""" """
Method to handle single request, adds the task to queue and awaits for result. Method to handle single request, adds the task to queue and awaits for result.
To be deprecated for better task handling. To be deprecated for better task handling.
@@ -52,49 +57,53 @@ class _Router:
await self.taskQueue.put((reqID, callPacket)) await self.taskQueue.put((reqID, callPacket))
while reqID not in self.returnDict: while reqID not in self.returnDict:
await asyncio.sleep(self.pollingDelay) await asyncio.sleep(self.pollingDelay)
returnValue = self.returnDict[reqID] returnValue = self.returnDict.pop(reqID)
self.returnDict.pop(reqID)
return returnValue return returnValue
async def multiClientRequest(self, wsConn:fastapi.WebSocket, count:int): async def multiClientRequest(self, wsConn: fastapi.WebSocket, count: int):
""" """
Method accepts a task list and adds them to the queue. Method accepts a task list and adds them to the queue.
Returns the results to client. Returns the results to client.
""" """
await wsConn.accept() await wsConn.accept()
softLimit=50 softLimit = 50
await wsConn.send_text(str(count)) await wsConn.send_text(str(count))
reqID = uuid.uuid4().hex reqID = uuid.uuid4().hex
self.returnDict[reqID] = [0]*count self.returnDict[reqID] = [0] * count
self.doneDict[reqID] = [0]*count self.doneDict[reqID] = [0] * count
print(f"Received {count} tasks") # print(f"Received {count} tasks")
taskBytes = await wsConn.receive_bytes() taskBytes = await wsConn.receive_bytes()
taskPackets = pkl.loads(taskBytes) taskPackets = pkl.loads(taskBytes)
softLimitItr = 0 softLimitItr = 0
for task in range(len(taskPackets)): for task in range(len(taskPackets)):
while (task > (softLimitItr+softLimit)) and not self.doneDict[reqID][softLimitItr]==1: while (task > (softLimitItr + softLimit)) and not self.doneDict[reqID][
softLimitItr
] == 1:
await asyncio.sleep(1) await asyncio.sleep(1)
if self.doneDict[reqID][softLimitItr]==1: if self.doneDict[reqID][softLimitItr] == 1:
softLimitItr+=1 softLimitItr += 1
t=Thread(target=self._worker, args=(reqID, task, taskPackets[task])) t = Thread(target=self._worker, args=(reqID, task, taskPackets[task]))
t.daemon=True t.daemon = True
t.start() t.start()
while not all(self.doneDict[reqID]): while not all(self.doneDict[reqID]):
await asyncio.sleep(1) await asyncio.sleep(1)
await wsConn.send_bytes(pkl.dumps(self.returnDict[reqID])) await wsConn.send_bytes(pkl.dumps(self.returnDict[reqID]))
self.returnDict.pop(reqID) self.returnDict.pop(reqID)
def _worker(self, id, idx, data:_ClientPacket): def _worker(self, id, idx, data: _ClientPacket):
""" """
Thread worker to handle one task. Thread worker to handle one task.
To be depricated for better task handling. To be depricated for better task handling.
""" """
retVal = asyncio.run(self.clientRequest(data)) retVal = asyncio.run(self.clientRequest(data))
self.returnDict[id][idx]=retVal self.returnDict[id][idx] = retVal
self.doneDict[id][idx]=1 self.doneDict[id][idx] = 1
return return
def startRouter(host, port, pollingDelay=0.1, logLevel=3):
def startRouter(
host: str, port: str | int, pollingDelay: float | int = 0.1, logLevel: int = 3
):
""" """
Main function to start the router system. Main function to start the router system.
""" """
@@ -102,6 +111,14 @@ def startRouter(host, port, pollingDelay=0.1, logLevel=3):
app = fastapi.FastAPI() app = fastapi.FastAPI()
app.include_router(br.router) app.include_router(br.router)
level = list(LOG_LEVELS.keys())[logLevel] level = list(LOG_LEVELS.keys())[logLevel]
serverConf = Config(app = app, host=host, port=port, log_level=LOG_LEVELS[level], ws_ping_interval=10, ws_ping_timeout=None, ws_max_size=1024*1024*1024) serverConf = Config(
app=app,
host=host,
port=int(port),
log_level=LOG_LEVELS[level],
ws_ping_interval=10,
ws_ping_timeout=None,
ws_max_size=1024 * 1024 * 1024,
)
server = Server(config=serverConf) server = Server(config=serverConf)
server.run() server.run()

View File

@@ -1,38 +0,0 @@
import base64
from typing import Any, Dict
from websockets.asyncio import client as WSC
from websockets.exceptions import WebSocketException
import asyncio
import pickle as pkl
from ._callSpec import _CallPacket
__all__ = ["startRunner"]
async def _send(funcMap: Dict[str, Any], url):
"""
Main logic funcion, connects to the router, takes the incoming task, executes and returns the result.
To improve error handling from the mapped function side.
"""
counter=0
async with WSC.connect(url, open_timeout=None, ping_interval=10, ping_timeout=None ) as w:
try:
id = await w.recv()
id = int(id)
print(f"Starting Runner, ID: {id}")
await w.send(base64.b64encode(pkl.dumps({"methods":list(funcMap.keys())})).decode("utf-8"))
while True:
packetBytes=await w.recv()
counter+=1
callPk:_CallPacket = pkl.loads(packetBytes)
print("-"*50 + f"\nRunning: {callPk.procedure}\nArgs: {callPk.data}\nCounter: {counter}\n" + "-"*50)
funcOutput = funcMap[callPk.procedure](**callPk.data)
await w.send(base64.b64encode(pkl.dumps(funcOutput)))
except WebSocketException as e:
print(f"Closing Conncetion with Broker, total call count: {counter}")
await w.close()
def startRunner(funcMapping, host, port):
"""
Main function to call from the user code.
"""
asyncio.run(_send(funcMapping, f"ws://{host}:{port}/reg"))

View File

@@ -0,0 +1,3 @@
from .runner import startRunner
__all__ = ["startRunner"]

59
harmoney/runner/runner.py Normal file
View File

@@ -0,0 +1,59 @@
import asyncio
import base64
import pickle as pkl
from typing import Any, Dict
from websockets.asyncio import client as WSC
from websockets.exceptions import WebSocketException
from .._callSpec import _CallPacket
__all__ = ["startRunner"]
async def _loop(funcMap: Dict[str, Any], url: str, debug=False):
"""
Main logic funcion, connects to the router, takes the incoming task, executes and returns the result.
To improve error handling from the mapped function side.
"""
counter = 0
async with WSC.connect(
url, open_timeout=None, ping_interval=10, ping_timeout=None
) as w:
try:
id = await w.recv()
id = int(id)
print(f"Starting Runner with ID: {id}")
await w.send(
base64.b64encode(pkl.dumps({"methods": list(funcMap.keys())})).decode(
"utf-8"
)
)
while True:
packetBytes = await w.recv()
counter += 1
callPk: _CallPacket = pkl.loads(packetBytes)
if debug:
print(
"-" * 50
+ f"\nRunning: {callPk.procedure}\nCounter: {counter}\nArgs: {str(callPk.data)[:30]}{'...' if len(str(callPk.data)) >= 30 else ''}\n"
+ "-" * 50
)
else:
print("-" * 30)
funcOutput = funcMap[callPk.procedure](**callPk.data)
await w.send(base64.b64encode(pkl.dumps(funcOutput)))
except WebSocketException:
print(f"Connection closed with Router, total call count: {counter}")
await w.close()
def startRunner(funcMapping: Dict, host: str, port: str | int, debug: bool = False):
"""
Main function to call from the user code.
"""
try:
port = int(str(port))
except:
raise TypeError(f"Port {port} not valid.")
asyncio.run(_loop(funcMapping, f"ws://{host}:{port}/reg", debug))