refactor, implement value return system

This commit is contained in:
2025-03-02 12:33:10 +05:30
parent a1ef1d53d6
commit a337d57107
13 changed files with 55 additions and 73 deletions

View File

@@ -1,5 +1 @@
from . import client
from . import runner
from . import broker
__all__ = ["client", "runner", "broker"]

View File

@@ -1,10 +1,9 @@
from typing import Any, Dict from typing import Any, Dict
import pydantic import pydantic
class CallPacket(pydantic.BaseModel): class _CallPacket(pydantic.BaseModel):
procedure: str procedure: str
data: Dict[str, Any] = {} data: Dict[str, Any] = {}
class ClientPacket(pydantic.BaseModel): class _ClientPacket(pydantic.BaseModel):
data: str data: str

View File

@@ -4,41 +4,52 @@ from uvicorn import Config, Server
import fastapi import fastapi
from uvicorn.config import LOG_LEVELS from uvicorn.config import LOG_LEVELS
import pickle as pkl import pickle as pkl
from callSpec import CallPacket, ClientPacket import uuid
from ._callSpec import _CallPacket, _ClientPacket
__all__ = ["runBroker"] __all__ = ["runBroker"]
class Broker: class _Broker:
def __init__(self) -> None: def __init__(self) -> None:
self.router = fastapi.APIRouter() self.router = fastapi.APIRouter()
self.router.add_api_websocket_route("/reg", self.registerRunner) self.router.add_api_websocket_route("/reg", self.registerRunner)
self.router.add_api_route("/cliReq", self.clientRequest, methods=["POST"]) self.router.add_api_route("/cliReq", self.clientRequest, methods=["POST"])
self.taskQueue = asyncio.Queue() self.taskQueue = asyncio.Queue()
self.runnerCount=0 self.runnerCount=0
self.returnDict = {}
async def registerRunner(self, wsConnection: fastapi.WebSocket): async def registerRunner(self, wsConnection: fastapi.WebSocket):
await wsConnection.accept() await wsConnection.accept()
await wsConnection.send_text(str(self.runnerCount)) await wsConnection.send_text(str(self.runnerCount))
methods=await wsConnection.receive_json() methods=await wsConnection.receive()
methods = pkl.loads(base64.b64decode(methods["text"]))
print(f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}") print(f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}")
self.runnerCount+=1 self.runnerCount+=1
while True: while True:
data:CallPacket = await self.taskQueue.get() reqID, data = await self.taskQueue.get()
# await asyncio.sleep(1) # await asyncio.sleep(1)
await wsConnection.send_bytes(pkl.dumps(data)) await wsConnection.send_bytes(pkl.dumps(data))
retValue = await wsConnection.receive() retValue = await wsConnection.receive()
# print(retValue)
self.returnDict[reqID] = retValue["bytes"]
# print(retValue["bytes"])
print(f"Tasks left: {self.taskQueue.qsize()}") print(f"Tasks left: {self.taskQueue.qsize()}")
async def clientRequest(self, data:ClientPacket): async def clientRequest(self, data:_ClientPacket):
print(data) # print(data)
reqID = uuid.uuid4().hex
callPacket = pkl.loads(base64.b64decode(data.data)) callPacket = pkl.loads(base64.b64decode(data.data))
await self.taskQueue.put(callPacket) await self.taskQueue.put((reqID, callPacket))
print(self.taskQueue.qsize) # print(self.taskQueue.qsize)
# await self.taskQueue.put(name) while reqID not in self.returnDict:
await asyncio.sleep(0.5)
await asyncio.sleep(1)
returnValue = self.returnDict[reqID]
return returnValue
def runBroker(host, port): def runBroker(host, port):
br = Broker() br = _Broker()
app = fastapi.FastAPI() app = fastapi.FastAPI()
app.include_router(br.router) app.include_router(br.router)
serverConf = Config(app = app, host=host, port=port, log_level=LOG_LEVELS["warning"], ws_ping_interval=10, ws_ping_timeout=None) serverConf = Config(app = app, host=host, port=port, log_level=LOG_LEVELS["warning"], ws_ping_interval=10, ws_ping_timeout=None)

View File

@@ -1,3 +0,0 @@
from .main import runBroker
__all__ = ["runBroker"]

18
harmoney/client.py Normal file
View File

@@ -0,0 +1,18 @@
import requests as req
from ._callSpec import _CallPacket
import pickle as pkl
import base64
__all__ = ["Client"]
class Client:
def __init__(self, host, port) -> None:
self._url = f"http://{host}:{port}/cliReq"
def rCall(self, function, **kwargs):
callPacket = _CallPacket(procedure=function, data=kwargs)
payload = {"data": base64.b64encode(pkl.dumps(callPacket)).decode("utf-8")}
resp = req.post(self._url, json=payload)
# print(resp.status_code)
# print(resp.text)
return pkl.loads(base64.b64decode(resp.text))

View File

@@ -1,3 +0,0 @@
from .main import Client
__all__ = ["Client"]

View File

@@ -1,7 +0,0 @@
from typing import Any, Dict
import pydantic
class CallPacket(pydantic.BaseModel):
procedure: str
data: Dict[str, Any ] = {}

View File

@@ -1,23 +0,0 @@
import requests as req
import callSpec
import pickle as pkl
import base64
# testNPArray = np.random.random((3,3,3, 3))
# testObj = callSpec.CallPacket(procedure="sum",data={"a":testNPArray} )
# x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")})
# testObj = callSpec.CallPacket(procedure="avg",data={"a":testNPArray} )
# x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")})
# print(x.status_code)
# print(x.text)
class Client:
def __init__(self, host, port) -> None:
self.url = f"http://{host}:{port}/cliReq"
def rCall(self, function, **kwargs):
callPacket = callSpec.CallPacket(procedure=function, data=kwargs)
payload = {"data": base64.b64encode(pkl.dumps(callPacket)).decode("utf-8")}
resp = req.post(self.url, json=payload)
print(resp.status_code)
print(resp.text)
return resp

View File

@@ -1,27 +1,26 @@
import base64
from typing import Any, Dict from typing import Any, Dict
from websockets.asyncio import client as WSC from websockets.asyncio import client as WSC
import asyncio import asyncio
import pickle as pkl import pickle as pkl
from callSpec import CallPacket from ._callSpec import _CallPacket
__all__ = ["startRunner"] __all__ = ["startRunner"]
async def test(funcMap: Dict[str, Any], url): async def _test(funcMap: Dict[str, Any], url):
counter=0 counter=0
async with WSC.connect(url, open_timeout=None, ping_interval=10, ping_timeout=None ) as w: async with WSC.connect(url, open_timeout=None, ping_interval=10, ping_timeout=None ) as w:
id = await w.recv() id = await w.recv()
id = int(id) id = int(id)
print(f"Starting Runner, ID: {id}") print(f"Starting Runner, ID: {id}")
await w.send({"methods":list(funcMap.keys())}) await w.send(base64.b64encode(pkl.dumps({"methods":list(funcMap.keys())})).decode("utf-8"))
while True: while True:
counter+=1 counter+=1
packetBytes=await w.recv() packetBytes=await w.recv()
callPk:CallPacket = pkl.loads(packetBytes) callPk:_CallPacket = pkl.loads(packetBytes)
print("-"*50 + f"\nRunning: {callPk.procedure}\nArgs: {callPk.data}\n" + "-"*25) print("-"*50 + f"\nRunning: {callPk.procedure}\nArgs: {callPk.data}\n" + "-"*50)
funcOutput = funcMap[callPk.procedure](**callPk.data) funcOutput = funcMap[callPk.procedure](**callPk.data)
await w.send(str(counter)) await w.send(base64.b64encode(pkl.dumps(funcOutput)))
def startRunner(funcMapping, host, port): def startRunner(funcMapping, host, port):
asyncio.run(test(funcMapping, f"ws://{host}:{port}/reg")) asyncio.run(_test(funcMapping, f"ws://{host}:{port}/reg"))

View File

@@ -1,3 +0,0 @@
from .main import startRunner
__all__ = ["startRunner"]

View File

@@ -1,7 +0,0 @@
from typing import Any, Dict
import pydantic
class CallPacket(pydantic.BaseModel):
procedure: str
data: Dict[str, Any ] = {}

View File

@@ -1,2 +1,4 @@
msgspec
fastapi fastapi
pydantic
websockets
requests

3
setup.py Normal file
View File

@@ -0,0 +1,3 @@
from setuptools import setup, find_packages
setup(name="harmoney", version="0.1.0", packages=find_packages())