rewrote client-router mcall arch, added funcdocs, removed requests dep
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -168,3 +168,4 @@ cython_debug/
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
|
||||
.testCache
|
||||
|
||||
@@ -1,43 +1,44 @@
|
||||
import time
|
||||
import requests as req
|
||||
import asyncio
|
||||
from ._callSpec import _CallPacket
|
||||
import pickle as pkl
|
||||
import base64
|
||||
from threading import Thread
|
||||
from websockets.asyncio import client as WSC
|
||||
|
||||
__all__ = ["Client"]
|
||||
|
||||
class Client:
|
||||
def __init__(self, host, port) -> None:
|
||||
self._url = f"http://{host}:{port}/cliReq"
|
||||
self._wsURL = f"ws://{host}:{port}/cliReq/"
|
||||
self.tasks = []
|
||||
|
||||
def singleCall(self, function, **kwargs):
|
||||
callPacket = _CallPacket(procedure=function, data=kwargs)
|
||||
payload = {"data": base64.b64encode(pkl.dumps(callPacket)).decode("utf-8")}
|
||||
resp = req.post(self._url, json=payload)
|
||||
return pkl.loads(base64.b64decode(resp.text))
|
||||
"""
|
||||
Performs single call with the provided function and args
|
||||
"""
|
||||
self.addCall(_CallPacket(procedure=function, data=kwargs))
|
||||
return self.runAllCalls()[0]
|
||||
|
||||
def addCall(self, function, **kwargs):
|
||||
self.tasks.append((function, kwargs))
|
||||
print(f"Total in Queue: {len(self.tasks)}")
|
||||
"""
|
||||
Adds a task to call queue.
|
||||
"""
|
||||
self.tasks.append((_CallPacket(procedure=function, data=kwargs)))
|
||||
|
||||
async def _runAllCalls(self, callDelay=0.01):
|
||||
"""
|
||||
Logic function to communicate with the router.
|
||||
"""
|
||||
print(f"Total Calls: {len(self.tasks)}")
|
||||
async with WSC.connect(self._wsURL+f"{len(self.tasks)}", open_timeout=None, ping_interval=10, ping_timeout=None) as ws:
|
||||
ackCount = int(await ws.recv())
|
||||
assert ackCount == len(self.tasks), "Comms not proper..."
|
||||
await ws.send(pkl.dumps(self.tasks))
|
||||
returnData = await ws.recv()
|
||||
returnData = pkl.loads(returnData)
|
||||
self.tasks=[]
|
||||
return returnData
|
||||
|
||||
def runAllCalls(self, callDelay=0.01):
|
||||
if len(self.tasks) == 0:
|
||||
return []
|
||||
self.returnValues = [0]*len(self.tasks)
|
||||
self.done = [0] * len(self.tasks)
|
||||
for callIDX in range(len(self.tasks)):
|
||||
t = Thread(target=self._threadWorker, args=[callIDX, self.tasks[callIDX]])
|
||||
t.start()
|
||||
time.sleep(callDelay)
|
||||
while not all(self.done):
|
||||
time.sleep(1)
|
||||
self.tasks = []
|
||||
return self.returnValues
|
||||
|
||||
def _threadWorker(self, callIDX, payload):
|
||||
# print(callIDX, payload)
|
||||
ret = self.singleCall(function=payload[0], **payload[1])
|
||||
self.returnValues[callIDX] = ret
|
||||
self.done[callIDX] =1
|
||||
"""
|
||||
User facing function to remotely execute queued tasks.
|
||||
"""
|
||||
return asyncio.run(self._runAllCalls(callDelay=callDelay))
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
import asyncio
|
||||
import base64
|
||||
from threading import Thread
|
||||
from typing import List
|
||||
from uvicorn import Config, Server
|
||||
import fastapi
|
||||
from uvicorn.config import LOG_LEVELS
|
||||
import pickle as pkl
|
||||
import uuid
|
||||
from ._callSpec import _ClientPacket
|
||||
from ._callSpec import _ClientPacket, _CallPacket
|
||||
|
||||
__all__ = ["startRouter"]
|
||||
|
||||
@@ -13,42 +15,93 @@ class _Router:
|
||||
def __init__(self, pollingDelay=0.5) -> None:
|
||||
self.router = fastapi.APIRouter()
|
||||
self.router.add_api_websocket_route("/reg", self.registerRunner)
|
||||
self.router.add_api_route("/cliReq", self.clientRequest, methods=["POST"])
|
||||
self.router.add_api_websocket_route("/cliReq/{count}", self.multiClientRequest)
|
||||
self.taskQueue = asyncio.Queue()
|
||||
self.runnerCount=0
|
||||
self.returnDict = {}
|
||||
self.doneDict = {}
|
||||
self.pollingDelay = pollingDelay
|
||||
|
||||
|
||||
async def registerRunner(self, wsConnection: fastapi.WebSocket):
|
||||
"""
|
||||
Method which queries an available task and sends the data to the attached runner.
|
||||
"""
|
||||
await wsConnection.accept()
|
||||
await wsConnection.send_text(str(self.runnerCount))
|
||||
methods=await wsConnection.receive()
|
||||
methods = pkl.loads(base64.b64decode(methods["text"]))
|
||||
print(f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}")
|
||||
runnerID=self.runnerCount
|
||||
self.runnerCount+=1
|
||||
runnerCounter = 0
|
||||
while True:
|
||||
reqID, data = await self.taskQueue.get()
|
||||
runnerCounter+=1
|
||||
print(f"Runr {runnerID} Counter: {runnerCounter}")
|
||||
await wsConnection.send_bytes(pkl.dumps(data))
|
||||
retValue = await wsConnection.receive()
|
||||
self.returnDict[reqID] = retValue["bytes"]
|
||||
print(f"Tasks left: {self.taskQueue.qsize()}")
|
||||
self.returnDict[reqID] = pkl.loads(base64.b64decode(retValue["bytes"]))
|
||||
|
||||
async def clientRequest(self, data:_ClientPacket):
|
||||
"""
|
||||
Method to handle single request, adds the task to queue and awaits for result.
|
||||
To be deprecated for better task handling.
|
||||
"""
|
||||
reqID = uuid.uuid4().hex
|
||||
callPacket = pkl.loads(base64.b64decode(data.data))
|
||||
callPacket = data
|
||||
await self.taskQueue.put((reqID, callPacket))
|
||||
while reqID not in self.returnDict:
|
||||
await asyncio.sleep(self.pollingDelay)
|
||||
# await asyncio.sleep(1)
|
||||
returnValue = self.returnDict[reqID]
|
||||
self.returnDict.pop(reqID)
|
||||
return returnValue
|
||||
|
||||
async def multiClientRequest(self, wsConn:fastapi.WebSocket, count:int):
|
||||
"""
|
||||
Method accepts a task list and adds them to the queue.
|
||||
Returns the results to client.
|
||||
"""
|
||||
await wsConn.accept()
|
||||
softLimit=50
|
||||
await wsConn.send_text(str(count))
|
||||
reqID = uuid.uuid4().hex
|
||||
self.returnDict[reqID] = [0]*count
|
||||
self.doneDict[reqID] = [0]*count
|
||||
print(f"Received {count} tasks")
|
||||
taskBytes = await wsConn.receive_bytes()
|
||||
taskPackets = pkl.loads(taskBytes)
|
||||
softLimitItr = 0
|
||||
for task in range(len(taskPackets)):
|
||||
while (task > (softLimitItr+softLimit)) and not self.doneDict[reqID][softLimitItr]==1:
|
||||
await asyncio.sleep(1)
|
||||
if self.doneDict[reqID][softLimitItr]==1:
|
||||
softLimitItr+=1
|
||||
t=Thread(target=self._worker, args=(reqID, task, taskPackets[task]))
|
||||
t.daemon=True
|
||||
t.start()
|
||||
while not all(self.doneDict[reqID]):
|
||||
await asyncio.sleep(1)
|
||||
await wsConn.send_bytes(pkl.dumps(self.returnDict[reqID]))
|
||||
self.returnDict.pop(reqID)
|
||||
|
||||
def _worker(self, id, idx, data:_ClientPacket):
|
||||
"""
|
||||
Thread worker to handle one task.
|
||||
To be depricated for better task handling.
|
||||
"""
|
||||
retVal = asyncio.run(self.clientRequest(data))
|
||||
self.returnDict[id][idx]=retVal
|
||||
self.doneDict[id][idx]=1
|
||||
return
|
||||
|
||||
def startRouter(host, port, pollingDelay=0.1, logLevel=3):
|
||||
"""
|
||||
Main function to start the router system.
|
||||
"""
|
||||
br = _Router(pollingDelay=pollingDelay)
|
||||
app = fastapi.FastAPI()
|
||||
app.include_router(br.router)
|
||||
level = list(LOG_LEVELS.keys())[logLevel]
|
||||
serverConf = Config(app = app, host=host, port=port, log_level=LOG_LEVELS[level], ws_ping_interval=10, ws_ping_timeout=None)
|
||||
serverConf = Config(app = app, host=host, port=port, log_level=LOG_LEVELS[level], ws_ping_interval=10, ws_ping_timeout=None, ws_max_size=1024*1024*1024)
|
||||
server = Server(config=serverConf)
|
||||
server.run()
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import base64
|
||||
from typing import Any, Dict
|
||||
# from fastapi import WebSocketException
|
||||
from websockets.asyncio import client as WSC
|
||||
from websockets.exceptions import WebSocketException
|
||||
import asyncio
|
||||
@@ -10,6 +9,10 @@ from ._callSpec import _CallPacket
|
||||
__all__ = ["startRunner"]
|
||||
|
||||
async def _send(funcMap: Dict[str, Any], url):
|
||||
"""
|
||||
Main logic funcion, connects to the router, takes the incoming task, executes and returns the result.
|
||||
To improve error handling from the mapped function side.
|
||||
"""
|
||||
counter=0
|
||||
async with WSC.connect(url, open_timeout=None, ping_interval=10, ping_timeout=None ) as w:
|
||||
try:
|
||||
@@ -29,4 +32,7 @@ async def _send(funcMap: Dict[str, Any], url):
|
||||
await w.close()
|
||||
|
||||
def startRunner(funcMapping, host, port):
|
||||
"""
|
||||
Main function to call from the user code.
|
||||
"""
|
||||
asyncio.run(_send(funcMapping, f"ws://{host}:{port}/reg"))
|
||||
|
||||
@@ -4,13 +4,13 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "harmoney"
|
||||
version = "0.2.0"
|
||||
description = "Simple Remote Function Calling Framework"
|
||||
version = "0.3.0"
|
||||
description = "Scalable Remote Function Calling Framework"
|
||||
authors = [{name = "Phani Pavan K", email = "kphanipavan@gmail.com"}]
|
||||
license = {text = "AGPLv3"}
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.8"
|
||||
dependencies = ["websockets>=15.0", "uvicorn>=0.34.0", "fastapi>=0.115.8", "pydantic>=2.10.6", "requests>=2.31.0"]
|
||||
dependencies = ["websockets>=15.0", "uvicorn>=0.34.0", "fastapi>=0.115.8", "pydantic>=2.10.6"]
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://git.pvnweb.dedyn.io/phanipavank/harmoney"
|
||||
Homepage = "https://github.com/kphanipavan/harmoney"
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
fastapi
|
||||
pydantic
|
||||
websockets
|
||||
requests
|
||||
|
||||
Reference in New Issue
Block a user