diff --git a/harmoney/__init__.py b/harmoney/__init__.py index b81cdfb..8b13789 100644 --- a/harmoney/__init__.py +++ b/harmoney/__init__.py @@ -1,5 +1 @@ -from . import client -from . import runner -from . import broker -__all__ = ["client", "runner", "broker"] diff --git a/harmoney/broker/callSpec.py b/harmoney/_callSpec.py similarity index 57% rename from harmoney/broker/callSpec.py rename to harmoney/_callSpec.py index 9be115d..1769999 100644 --- a/harmoney/broker/callSpec.py +++ b/harmoney/_callSpec.py @@ -1,10 +1,9 @@ from typing import Any, Dict - import pydantic -class CallPacket(pydantic.BaseModel): +class _CallPacket(pydantic.BaseModel): procedure: str data: Dict[str, Any] = {} -class ClientPacket(pydantic.BaseModel): +class _ClientPacket(pydantic.BaseModel): data: str diff --git a/harmoney/broker/main.py b/harmoney/broker.py similarity index 62% rename from harmoney/broker/main.py rename to harmoney/broker.py index a4b056f..c33588d 100644 --- a/harmoney/broker/main.py +++ b/harmoney/broker.py @@ -4,41 +4,52 @@ from uvicorn import Config, Server import fastapi from uvicorn.config import LOG_LEVELS import pickle as pkl -from callSpec import CallPacket, ClientPacket +import uuid +from ._callSpec import _CallPacket, _ClientPacket __all__ = ["runBroker"] -class Broker: +class _Broker: def __init__(self) -> None: self.router = fastapi.APIRouter() self.router.add_api_websocket_route("/reg", self.registerRunner) self.router.add_api_route("/cliReq", self.clientRequest, methods=["POST"]) self.taskQueue = asyncio.Queue() self.runnerCount=0 + self.returnDict = {} async def registerRunner(self, wsConnection: fastapi.WebSocket): await wsConnection.accept() await wsConnection.send_text(str(self.runnerCount)) - methods=await wsConnection.receive_json() + methods=await wsConnection.receive() + methods = pkl.loads(base64.b64decode(methods["text"])) print(f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}") self.runnerCount+=1 while True: - data:CallPacket = await self.taskQueue.get() + reqID, data = await self.taskQueue.get() # await asyncio.sleep(1) await wsConnection.send_bytes(pkl.dumps(data)) retValue = await wsConnection.receive() + # print(retValue) + self.returnDict[reqID] = retValue["bytes"] + # print(retValue["bytes"]) print(f"Tasks left: {self.taskQueue.qsize()}") - async def clientRequest(self, data:ClientPacket): - print(data) + async def clientRequest(self, data:_ClientPacket): + # print(data) + reqID = uuid.uuid4().hex callPacket = pkl.loads(base64.b64decode(data.data)) - await self.taskQueue.put(callPacket) - print(self.taskQueue.qsize) - # await self.taskQueue.put(name) + await self.taskQueue.put((reqID, callPacket)) + # print(self.taskQueue.qsize) + while reqID not in self.returnDict: + await asyncio.sleep(0.5) + await asyncio.sleep(1) + returnValue = self.returnDict[reqID] + return returnValue def runBroker(host, port): - br = Broker() + br = _Broker() app = fastapi.FastAPI() app.include_router(br.router) serverConf = Config(app = app, host=host, port=port, log_level=LOG_LEVELS["warning"], ws_ping_interval=10, ws_ping_timeout=None) diff --git a/harmoney/broker/__init__.py b/harmoney/broker/__init__.py deleted file mode 100644 index f9cb6ed..0000000 --- a/harmoney/broker/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .main import runBroker - -__all__ = ["runBroker"] diff --git a/harmoney/client.py b/harmoney/client.py new file mode 100644 index 0000000..65ac4c0 --- /dev/null +++ b/harmoney/client.py @@ -0,0 +1,18 @@ +import requests as req +from ._callSpec import _CallPacket +import pickle as pkl +import base64 + +__all__ = ["Client"] + +class Client: + def __init__(self, host, port) -> None: + self._url = f"http://{host}:{port}/cliReq" + + def rCall(self, function, **kwargs): + callPacket = _CallPacket(procedure=function, data=kwargs) + payload = {"data": base64.b64encode(pkl.dumps(callPacket)).decode("utf-8")} + resp = req.post(self._url, json=payload) + # print(resp.status_code) + # print(resp.text) + return pkl.loads(base64.b64decode(resp.text)) diff --git a/harmoney/client/__init__.py b/harmoney/client/__init__.py deleted file mode 100644 index a08bbcf..0000000 --- a/harmoney/client/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .main import Client - -__all__ = ["Client"] diff --git a/harmoney/client/callSpec.py b/harmoney/client/callSpec.py deleted file mode 100644 index 7ecbe48..0000000 --- a/harmoney/client/callSpec.py +++ /dev/null @@ -1,7 +0,0 @@ -from typing import Any, Dict - -import pydantic - -class CallPacket(pydantic.BaseModel): - procedure: str - data: Dict[str, Any ] = {} diff --git a/harmoney/client/main.py b/harmoney/client/main.py deleted file mode 100644 index 9ca8271..0000000 --- a/harmoney/client/main.py +++ /dev/null @@ -1,23 +0,0 @@ -import requests as req -import callSpec -import pickle as pkl -import base64 -# testNPArray = np.random.random((3,3,3, 3)) -# testObj = callSpec.CallPacket(procedure="sum",data={"a":testNPArray} ) -# x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")}) -# testObj = callSpec.CallPacket(procedure="avg",data={"a":testNPArray} ) -# x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")}) -# print(x.status_code) -# print(x.text) - -class Client: - def __init__(self, host, port) -> None: - self.url = f"http://{host}:{port}/cliReq" - - def rCall(self, function, **kwargs): - callPacket = callSpec.CallPacket(procedure=function, data=kwargs) - payload = {"data": base64.b64encode(pkl.dumps(callPacket)).decode("utf-8")} - resp = req.post(self.url, json=payload) - print(resp.status_code) - print(resp.text) - return resp diff --git a/harmoney/runner/main.py b/harmoney/runner.py similarity index 59% rename from harmoney/runner/main.py rename to harmoney/runner.py index 7391744..202ab1c 100644 --- a/harmoney/runner/main.py +++ b/harmoney/runner.py @@ -1,27 +1,26 @@ +import base64 from typing import Any, Dict from websockets.asyncio import client as WSC import asyncio import pickle as pkl -from callSpec import CallPacket +from ._callSpec import _CallPacket __all__ = ["startRunner"] -async def test(funcMap: Dict[str, Any], url): +async def _test(funcMap: Dict[str, Any], url): counter=0 async with WSC.connect(url, open_timeout=None, ping_interval=10, ping_timeout=None ) as w: id = await w.recv() id = int(id) print(f"Starting Runner, ID: {id}") - await w.send({"methods":list(funcMap.keys())}) + await w.send(base64.b64encode(pkl.dumps({"methods":list(funcMap.keys())})).decode("utf-8")) while True: counter+=1 packetBytes=await w.recv() - callPk:CallPacket = pkl.loads(packetBytes) - print("-"*50 + f"\nRunning: {callPk.procedure}\nArgs: {callPk.data}\n" + "-"*25) + callPk:_CallPacket = pkl.loads(packetBytes) + print("-"*50 + f"\nRunning: {callPk.procedure}\nArgs: {callPk.data}\n" + "-"*50) funcOutput = funcMap[callPk.procedure](**callPk.data) - await w.send(str(counter)) - - + await w.send(base64.b64encode(pkl.dumps(funcOutput))) def startRunner(funcMapping, host, port): - asyncio.run(test(funcMapping, f"ws://{host}:{port}/reg")) + asyncio.run(_test(funcMapping, f"ws://{host}:{port}/reg")) diff --git a/harmoney/runner/__init__.py b/harmoney/runner/__init__.py deleted file mode 100644 index 839f9b0..0000000 --- a/harmoney/runner/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .main import startRunner - -__all__ = ["startRunner"] diff --git a/harmoney/runner/callSpec.py b/harmoney/runner/callSpec.py deleted file mode 100644 index 7ecbe48..0000000 --- a/harmoney/runner/callSpec.py +++ /dev/null @@ -1,7 +0,0 @@ -from typing import Any, Dict - -import pydantic - -class CallPacket(pydantic.BaseModel): - procedure: str - data: Dict[str, Any ] = {} diff --git a/requirements.txt b/requirements.txt index faaadf3..f185d43 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ -msgspec fastapi +pydantic +websockets +requests diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..4d5478a --- /dev/null +++ b/setup.py @@ -0,0 +1,3 @@ +from setuptools import setup, find_packages + +setup(name="harmoney", version="0.1.0", packages=find_packages())