diff --git a/broker/__pycache__/callSpec.cpython-311.pyc b/broker/__pycache__/callSpec.cpython-311.pyc deleted file mode 100644 index 5b374e9..0000000 Binary files a/broker/__pycache__/callSpec.cpython-311.pyc and /dev/null differ diff --git a/broker/taskQueue.py b/broker/taskQueue.py deleted file mode 100644 index 4258408..0000000 --- a/broker/taskQueue.py +++ /dev/null @@ -1,51 +0,0 @@ -""" -Task queue manager class, courtesy of Shashwat Kumar -https://medium.com/@shashwat_ds/a-tiny-multi-threaded-job-queue-in-30-lines-of-python-a344c3f3f7f0 - -Later modified by Phani Pavan K to meet PEP8.1. -Stage 2 modifications include adding multiprocessing for scale up. -""" - -import queue -from threading import Thread -from multiprocessing import Process -import time - -class TaskQueue(queue.Queue): - def __init__(self, num_workers=1): - queue.Queue.__init__(self) - self.num_workers = num_workers - self.startWorkers() - - def addTask(self, task, *args, **kwargs): - args = args or () - kwargs = kwargs or {} - self.put((task, args, kwargs)) - - def startWorkers(self): - for _ in range(self.num_workers): - t = Thread(target=self.worker) - t.daemon = True - t.start() - - def worker(self): - while True: - item, args, kwargs = self.get() - p=Process(target=item, args=args, kwargs=kwargs) - p.start() - p.join() - self.task_done() - -if __name__ == "__main__": - testQueue = TaskQueue(4) - - def printer(x): - for i in range(3): - print(x) - time.sleep(1) - - for i in range(10): - testQueue.addTask(printer, f"Task {i}") - print(f"Added task {i}") - print("Done adding tasks to the queue") - testQueue.join() diff --git a/client/__pycache__/callSpec.cpython-313.pyc b/client/__pycache__/callSpec.cpython-313.pyc deleted file mode 100644 index fd55df1..0000000 Binary files a/client/__pycache__/callSpec.cpython-313.pyc and /dev/null differ diff --git a/client/main.py b/client/main.py deleted file mode 100644 index 8f53247..0000000 --- a/client/main.py +++ /dev/null @@ -1,9 +0,0 @@ -import requests as req -import callSpec - - -testObj = callSpec.CallPacket(procedure="arstarst", simpleObjects={"arst":"qwfpp"}) -print(testObj.model_dump_json()) -x=req.post("http://127.0.0.1:7732/cliReq", json=testObj.model_dump()) -print(x.status_code) -print(x.text) diff --git a/harmoney/broker/callSpec.py b/harmoney/broker/callSpec.py new file mode 100644 index 0000000..9be115d --- /dev/null +++ b/harmoney/broker/callSpec.py @@ -0,0 +1,10 @@ +from typing import Any, Dict + +import pydantic + +class CallPacket(pydantic.BaseModel): + procedure: str + data: Dict[str, Any] = {} + +class ClientPacket(pydantic.BaseModel): + data: str diff --git a/broker/main.py b/harmoney/broker/main.py similarity index 68% rename from broker/main.py rename to harmoney/broker/main.py index 83690ba..f3106db 100644 --- a/broker/main.py +++ b/harmoney/broker/main.py @@ -1,12 +1,12 @@ import asyncio -from typing import Dict +import base64 from uvicorn import Config, Server import fastapi -from threading import Thread from uvicorn.config import LOG_LEVELS +import pickle as pkl -from callSpec import CallPacket +from callSpec import CallPacket, ClientPacket class Broker: @@ -24,21 +24,25 @@ class Broker: print("started") while True: - data = await self.taskQueue.get() + data:CallPacket = await self.taskQueue.get() # await asyncio.sleep(1) - await wsConnection.send_text(data) + await wsConnection.send_bytes(pkl.dumps(data)) + print(await wsConnection.receive()) print(f"left: {self.taskQueue.qsize()}") - async def clientRequest(self, data:CallPacket): + async def clientRequest(self, data:ClientPacket): print(data) + callPacket = pkl.loads(base64.b64decode(data.data)) + await self.taskQueue.put(callPacket) + print(self.taskQueue.qsize) # await self.taskQueue.put(name) def runBroker(): br = Broker() app = fastapi.FastAPI() app.include_router(br.router) - serverConf = Config(app = app, host="0.0.0.0", port=7732, log_level=LOG_LEVELS["trace"], ws_ping_interval=10, ws_ping_timeout=None) + serverConf = Config(app = app, host="0.0.0.0", port=7732, log_level=LOG_LEVELS["debug"], ws_ping_interval=10, ws_ping_timeout=None) server = Server(config=serverConf) server.run() diff --git a/client/callSpec.py b/harmoney/client/callSpec.py similarity index 57% rename from client/callSpec.py rename to harmoney/client/callSpec.py index 419c8f3..7ecbe48 100644 --- a/client/callSpec.py +++ b/harmoney/client/callSpec.py @@ -4,5 +4,4 @@ import pydantic class CallPacket(pydantic.BaseModel): procedure: str - simpleObjects: Dict[str, Any] = {} - byteObjects: Dict[str, bytes] = {} + data: Dict[str, Any ] = {} diff --git a/harmoney/client/main.py b/harmoney/client/main.py new file mode 100644 index 0000000..030496a --- /dev/null +++ b/harmoney/client/main.py @@ -0,0 +1,12 @@ +import requests as req +import callSpec +import numpy as np +import pickle as pkl +import base64 +testNPArray = np.ones((3,3,3)) +testObj = callSpec.CallPacket(procedure="sum",data={"a":testNPArray} ) +x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")}) +testObj = callSpec.CallPacket(procedure="avg",data={"a":testNPArray} ) +x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")}) +print(x.status_code) +print(x.text) diff --git a/broker/callSpec.py b/harmoney/runner/callSpec.py similarity index 57% rename from broker/callSpec.py rename to harmoney/runner/callSpec.py index 419c8f3..7ecbe48 100644 --- a/broker/callSpec.py +++ b/harmoney/runner/callSpec.py @@ -4,5 +4,4 @@ import pydantic class CallPacket(pydantic.BaseModel): procedure: str - simpleObjects: Dict[str, Any] = {} - byteObjects: Dict[str, bytes] = {} + data: Dict[str, Any ] = {} diff --git a/harmoney/runner/mail.py b/harmoney/runner/mail.py new file mode 100644 index 0000000..ac064ab --- /dev/null +++ b/harmoney/runner/mail.py @@ -0,0 +1,23 @@ +from websockets.asyncio import client as WSC +import asyncio +import pickle as pkl +import numpy as np + +from callSpec import CallPacket + +async def test(funcMap): + counter=0 + async with WSC.connect("ws://0.0.0.0:7732/reg", open_timeout=None, ping_interval=10, ping_timeout=None ) as w: + while True: + counter+=1 + packetBytes=await w.recv() + callPk:CallPacket = pkl.loads(packetBytes) + # await asyncio.sleep(1) + print(callPk) + print(funcMap[callPk.procedure](**callPk.data)) + await w.send(str(counter), text=True) + + +funcMapping = {"sum": np.sum, "avg": np.average} + +asyncio.run(test(funcMapping)) diff --git a/runner/mail.py b/runner/mail.py deleted file mode 100644 index 7796f69..0000000 --- a/runner/mail.py +++ /dev/null @@ -1,18 +0,0 @@ -import requests as req -from websockets.asyncio import client as WSC -import asyncio - -async def test(func): - counter=0 - async with WSC.connect("ws://0.0.0.0:7732/reg", open_timeout=None, ping_interval=10, ping_timeout=None ) as w: - while True: - counter+=1 - x=await w.recv() - await asyncio.sleep(1) - func() - print(x) - await w.send(str(counter), text=True) - - - -asyncio.run(test(print))