diff --git a/harmoney/__init__.py b/harmoney/__init__.py new file mode 100644 index 0000000..b81cdfb --- /dev/null +++ b/harmoney/__init__.py @@ -0,0 +1,5 @@ +from . import client +from . import runner +from . import broker + +__all__ = ["client", "runner", "broker"] diff --git a/harmoney/broker/__init__.py b/harmoney/broker/__init__.py new file mode 100644 index 0000000..f9cb6ed --- /dev/null +++ b/harmoney/broker/__init__.py @@ -0,0 +1,3 @@ +from .main import runBroker + +__all__ = ["runBroker"] diff --git a/harmoney/broker/main.py b/harmoney/broker/main.py index f3106db..a4b056f 100644 --- a/harmoney/broker/main.py +++ b/harmoney/broker/main.py @@ -2,34 +2,33 @@ import asyncio import base64 from uvicorn import Config, Server import fastapi - from uvicorn.config import LOG_LEVELS import pickle as pkl - from callSpec import CallPacket, ClientPacket +__all__ = ["runBroker"] class Broker: def __init__(self) -> None: - self.runnerQueue = [] self.router = fastapi.APIRouter() self.router.add_api_websocket_route("/reg", self.registerRunner) self.router.add_api_route("/cliReq", self.clientRequest, methods=["POST"]) self.taskQueue = asyncio.Queue() + self.runnerCount=0 async def registerRunner(self, wsConnection: fastapi.WebSocket): - await wsConnection.accept() - - print("started") + await wsConnection.send_text(str(self.runnerCount)) + methods=await wsConnection.receive_json() + print(f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}") + self.runnerCount+=1 while True: data:CallPacket = await self.taskQueue.get() # await asyncio.sleep(1) await wsConnection.send_bytes(pkl.dumps(data)) - - print(await wsConnection.receive()) - print(f"left: {self.taskQueue.qsize()}") + retValue = await wsConnection.receive() + print(f"Tasks left: {self.taskQueue.qsize()}") async def clientRequest(self, data:ClientPacket): print(data) @@ -38,12 +37,12 @@ class Broker: print(self.taskQueue.qsize) # await self.taskQueue.put(name) -def runBroker(): +def runBroker(host, port): br = Broker() app = fastapi.FastAPI() app.include_router(br.router) - serverConf = Config(app = app, host="0.0.0.0", port=7732, log_level=LOG_LEVELS["debug"], ws_ping_interval=10, ws_ping_timeout=None) + serverConf = Config(app = app, host=host, port=port, log_level=LOG_LEVELS["warning"], ws_ping_interval=10, ws_ping_timeout=None) server = Server(config=serverConf) server.run() -runBroker() +# runBroker() diff --git a/harmoney/client/__init__.py b/harmoney/client/__init__.py new file mode 100644 index 0000000..a08bbcf --- /dev/null +++ b/harmoney/client/__init__.py @@ -0,0 +1,3 @@ +from .main import Client + +__all__ = ["Client"] diff --git a/harmoney/client/main.py b/harmoney/client/main.py index 030496a..9ca8271 100644 --- a/harmoney/client/main.py +++ b/harmoney/client/main.py @@ -1,12 +1,23 @@ import requests as req import callSpec -import numpy as np import pickle as pkl import base64 -testNPArray = np.ones((3,3,3)) -testObj = callSpec.CallPacket(procedure="sum",data={"a":testNPArray} ) -x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")}) -testObj = callSpec.CallPacket(procedure="avg",data={"a":testNPArray} ) -x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")}) -print(x.status_code) -print(x.text) +# testNPArray = np.random.random((3,3,3, 3)) +# testObj = callSpec.CallPacket(procedure="sum",data={"a":testNPArray} ) +# x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")}) +# testObj = callSpec.CallPacket(procedure="avg",data={"a":testNPArray} ) +# x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")}) +# print(x.status_code) +# print(x.text) + +class Client: + def __init__(self, host, port) -> None: + self.url = f"http://{host}:{port}/cliReq" + + def rCall(self, function, **kwargs): + callPacket = callSpec.CallPacket(procedure=function, data=kwargs) + payload = {"data": base64.b64encode(pkl.dumps(callPacket)).decode("utf-8")} + resp = req.post(self.url, json=payload) + print(resp.status_code) + print(resp.text) + return resp diff --git a/harmoney/runner/__init__.py b/harmoney/runner/__init__.py new file mode 100644 index 0000000..839f9b0 --- /dev/null +++ b/harmoney/runner/__init__.py @@ -0,0 +1,3 @@ +from .main import startRunner + +__all__ = ["startRunner"] diff --git a/harmoney/runner/mail.py b/harmoney/runner/mail.py deleted file mode 100644 index ac064ab..0000000 --- a/harmoney/runner/mail.py +++ /dev/null @@ -1,23 +0,0 @@ -from websockets.asyncio import client as WSC -import asyncio -import pickle as pkl -import numpy as np - -from callSpec import CallPacket - -async def test(funcMap): - counter=0 - async with WSC.connect("ws://0.0.0.0:7732/reg", open_timeout=None, ping_interval=10, ping_timeout=None ) as w: - while True: - counter+=1 - packetBytes=await w.recv() - callPk:CallPacket = pkl.loads(packetBytes) - # await asyncio.sleep(1) - print(callPk) - print(funcMap[callPk.procedure](**callPk.data)) - await w.send(str(counter), text=True) - - -funcMapping = {"sum": np.sum, "avg": np.average} - -asyncio.run(test(funcMapping)) diff --git a/harmoney/runner/main.py b/harmoney/runner/main.py new file mode 100644 index 0000000..7391744 --- /dev/null +++ b/harmoney/runner/main.py @@ -0,0 +1,27 @@ +from typing import Any, Dict +from websockets.asyncio import client as WSC +import asyncio +import pickle as pkl +from callSpec import CallPacket + +__all__ = ["startRunner"] + +async def test(funcMap: Dict[str, Any], url): + counter=0 + async with WSC.connect(url, open_timeout=None, ping_interval=10, ping_timeout=None ) as w: + id = await w.recv() + id = int(id) + print(f"Starting Runner, ID: {id}") + await w.send({"methods":list(funcMap.keys())}) + while True: + counter+=1 + packetBytes=await w.recv() + callPk:CallPacket = pkl.loads(packetBytes) + print("-"*50 + f"\nRunning: {callPk.procedure}\nArgs: {callPk.data}\n" + "-"*25) + funcOutput = funcMap[callPk.procedure](**callPk.data) + await w.send(str(counter)) + + + +def startRunner(funcMapping, host, port): + asyncio.run(test(funcMapping, f"ws://{host}:{port}/reg"))