From 5dfc478eda315be2be9de8c45072e09693d4d179 Mon Sep 17 00:00:00 2001 From: Phani Pavan K Date: Sat, 1 Mar 2025 16:02:57 +0530 Subject: [PATCH] restructure, properly working --- broker/__pycache__/callSpec.cpython-311.pyc | Bin 726 -> 0 bytes broker/taskQueue.py | 51 -------------------- client/__pycache__/callSpec.cpython-313.pyc | Bin 676 -> 0 bytes client/main.py | 9 ---- harmoney/broker/callSpec.py | 10 ++++ {broker => harmoney/broker}/main.py | 18 ++++--- {client => harmoney/client}/callSpec.py | 3 +- harmoney/client/main.py | 12 +++++ {broker => harmoney/runner}/callSpec.py | 3 +- harmoney/runner/mail.py | 23 +++++++++ runner/mail.py | 18 ------- 11 files changed, 58 insertions(+), 89 deletions(-) delete mode 100644 broker/__pycache__/callSpec.cpython-311.pyc delete mode 100644 broker/taskQueue.py delete mode 100644 client/__pycache__/callSpec.cpython-313.pyc delete mode 100644 client/main.py create mode 100644 harmoney/broker/callSpec.py rename {broker => harmoney/broker}/main.py (68%) rename {client => harmoney/client}/callSpec.py (57%) create mode 100644 harmoney/client/main.py rename {broker => harmoney/runner}/callSpec.py (57%) create mode 100644 harmoney/runner/mail.py delete mode 100644 runner/mail.py diff --git a/broker/__pycache__/callSpec.cpython-311.pyc b/broker/__pycache__/callSpec.cpython-311.pyc deleted file mode 100644 index 5b374e99dd782dc11f3f226ee2a60ba2bd9151bd..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 726 zcmZuuzmL-}6t?4}q=l;Vgb+w92vpPovK-s#q=SPkhC@T5 zoj8}~Zs2FGzur4^rS6F9Ac$q;N~E2EemeOW{jzjGO1ViZ#}(U9s(d zSfp8MY7w&Yn1taA;vcX~wo{(?jLtZdT@gfS$exd<%+G|pJkGNQxWkULF^);ZFqR#R zqlC^v9bdut?TmzL28oQz9>ye&lZ<3R5(|vEt&ZmUWw^c{Z!0G)U0r+vD+R#WeP=(3 zm@^595RRWvCY*i3qa(wVWr~^23&FWqBIJlUljHgS69} zk~j-|*?vR>+f!=8TG3UVS0!(#=*-@#Uum0PR}iWpUMsgm5W)qxb#hVv4thn^mL`Jt Vmf(L^-Z<^w`K5{97qHdwe*hjlvnT)n diff --git a/broker/taskQueue.py b/broker/taskQueue.py deleted file mode 100644 index 4258408..0000000 --- a/broker/taskQueue.py +++ /dev/null @@ -1,51 +0,0 @@ -""" -Task queue manager class, courtesy of Shashwat Kumar -https://medium.com/@shashwat_ds/a-tiny-multi-threaded-job-queue-in-30-lines-of-python-a344c3f3f7f0 - -Later modified by Phani Pavan K to meet PEP8.1. -Stage 2 modifications include adding multiprocessing for scale up. -""" - -import queue -from threading import Thread -from multiprocessing import Process -import time - -class TaskQueue(queue.Queue): - def __init__(self, num_workers=1): - queue.Queue.__init__(self) - self.num_workers = num_workers - self.startWorkers() - - def addTask(self, task, *args, **kwargs): - args = args or () - kwargs = kwargs or {} - self.put((task, args, kwargs)) - - def startWorkers(self): - for _ in range(self.num_workers): - t = Thread(target=self.worker) - t.daemon = True - t.start() - - def worker(self): - while True: - item, args, kwargs = self.get() - p=Process(target=item, args=args, kwargs=kwargs) - p.start() - p.join() - self.task_done() - -if __name__ == "__main__": - testQueue = TaskQueue(4) - - def printer(x): - for i in range(3): - print(x) - time.sleep(1) - - for i in range(10): - testQueue.addTask(printer, f"Task {i}") - print(f"Added task {i}") - print("Done adding tasks to the queue") - testQueue.join() diff --git a/client/__pycache__/callSpec.cpython-313.pyc b/client/__pycache__/callSpec.cpython-313.pyc deleted file mode 100644 index fd55df1822e7199fbda49dd159a6213be722d19c..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 676 zcmYLGJ#Q015Z$}IbNmr2GD@U?Kmr1eECEO1bSM%ZqKFj}ajm7*dc8Oo&U?q~y0{LB zg4Ps~{1^BG)cl2j1kpAWsnQ`CdS?AWR@&KlZ{}&<+|BCsD#7!u{dM>S-wOv;CtN18 zmzbQAn8aR(9C+Xz_~74m*zZuF+vF~Z=?xMGJ^H%55?{T0bdf_i^jqPj11;aur>d~Q zi$s{NxauWj*-U0f$exlO0WT)t#~#r8q!;wU*x$vcRrD)OTIGatN;1H z@o@;XVX3udtIv6w9&zzjnoB3Nl^il52V;=7u9H!o%2)jpDU7zYeqmPNGwHV0ZJ9B} zN0KpHWo(oU#;MEK8GAS8>9VrH*qa2jNfRYihGnV^SY)|Unc*hMlx7SnI2!^N+{WPk zf(_k3ZMPWHu2C><3?%(AlBlR*%M35E52NF3B%@)6?*1Yk_;)CtOb%Y*XeG9{H d>UrKTa`S?0{0XSHJK6r(*g0$L{3QtH@_!YIs4V~h diff --git a/client/main.py b/client/main.py deleted file mode 100644 index 8f53247..0000000 --- a/client/main.py +++ /dev/null @@ -1,9 +0,0 @@ -import requests as req -import callSpec - - -testObj = callSpec.CallPacket(procedure="arstarst", simpleObjects={"arst":"qwfpp"}) -print(testObj.model_dump_json()) -x=req.post("http://127.0.0.1:7732/cliReq", json=testObj.model_dump()) -print(x.status_code) -print(x.text) diff --git a/harmoney/broker/callSpec.py b/harmoney/broker/callSpec.py new file mode 100644 index 0000000..9be115d --- /dev/null +++ b/harmoney/broker/callSpec.py @@ -0,0 +1,10 @@ +from typing import Any, Dict + +import pydantic + +class CallPacket(pydantic.BaseModel): + procedure: str + data: Dict[str, Any] = {} + +class ClientPacket(pydantic.BaseModel): + data: str diff --git a/broker/main.py b/harmoney/broker/main.py similarity index 68% rename from broker/main.py rename to harmoney/broker/main.py index 83690ba..f3106db 100644 --- a/broker/main.py +++ b/harmoney/broker/main.py @@ -1,12 +1,12 @@ import asyncio -from typing import Dict +import base64 from uvicorn import Config, Server import fastapi -from threading import Thread from uvicorn.config import LOG_LEVELS +import pickle as pkl -from callSpec import CallPacket +from callSpec import CallPacket, ClientPacket class Broker: @@ -24,21 +24,25 @@ class Broker: print("started") while True: - data = await self.taskQueue.get() + data:CallPacket = await self.taskQueue.get() # await asyncio.sleep(1) - await wsConnection.send_text(data) + await wsConnection.send_bytes(pkl.dumps(data)) + print(await wsConnection.receive()) print(f"left: {self.taskQueue.qsize()}") - async def clientRequest(self, data:CallPacket): + async def clientRequest(self, data:ClientPacket): print(data) + callPacket = pkl.loads(base64.b64decode(data.data)) + await self.taskQueue.put(callPacket) + print(self.taskQueue.qsize) # await self.taskQueue.put(name) def runBroker(): br = Broker() app = fastapi.FastAPI() app.include_router(br.router) - serverConf = Config(app = app, host="0.0.0.0", port=7732, log_level=LOG_LEVELS["trace"], ws_ping_interval=10, ws_ping_timeout=None) + serverConf = Config(app = app, host="0.0.0.0", port=7732, log_level=LOG_LEVELS["debug"], ws_ping_interval=10, ws_ping_timeout=None) server = Server(config=serverConf) server.run() diff --git a/client/callSpec.py b/harmoney/client/callSpec.py similarity index 57% rename from client/callSpec.py rename to harmoney/client/callSpec.py index 419c8f3..7ecbe48 100644 --- a/client/callSpec.py +++ b/harmoney/client/callSpec.py @@ -4,5 +4,4 @@ import pydantic class CallPacket(pydantic.BaseModel): procedure: str - simpleObjects: Dict[str, Any] = {} - byteObjects: Dict[str, bytes] = {} + data: Dict[str, Any ] = {} diff --git a/harmoney/client/main.py b/harmoney/client/main.py new file mode 100644 index 0000000..030496a --- /dev/null +++ b/harmoney/client/main.py @@ -0,0 +1,12 @@ +import requests as req +import callSpec +import numpy as np +import pickle as pkl +import base64 +testNPArray = np.ones((3,3,3)) +testObj = callSpec.CallPacket(procedure="sum",data={"a":testNPArray} ) +x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")}) +testObj = callSpec.CallPacket(procedure="avg",data={"a":testNPArray} ) +x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")}) +print(x.status_code) +print(x.text) diff --git a/broker/callSpec.py b/harmoney/runner/callSpec.py similarity index 57% rename from broker/callSpec.py rename to harmoney/runner/callSpec.py index 419c8f3..7ecbe48 100644 --- a/broker/callSpec.py +++ b/harmoney/runner/callSpec.py @@ -4,5 +4,4 @@ import pydantic class CallPacket(pydantic.BaseModel): procedure: str - simpleObjects: Dict[str, Any] = {} - byteObjects: Dict[str, bytes] = {} + data: Dict[str, Any ] = {} diff --git a/harmoney/runner/mail.py b/harmoney/runner/mail.py new file mode 100644 index 0000000..ac064ab --- /dev/null +++ b/harmoney/runner/mail.py @@ -0,0 +1,23 @@ +from websockets.asyncio import client as WSC +import asyncio +import pickle as pkl +import numpy as np + +from callSpec import CallPacket + +async def test(funcMap): + counter=0 + async with WSC.connect("ws://0.0.0.0:7732/reg", open_timeout=None, ping_interval=10, ping_timeout=None ) as w: + while True: + counter+=1 + packetBytes=await w.recv() + callPk:CallPacket = pkl.loads(packetBytes) + # await asyncio.sleep(1) + print(callPk) + print(funcMap[callPk.procedure](**callPk.data)) + await w.send(str(counter), text=True) + + +funcMapping = {"sum": np.sum, "avg": np.average} + +asyncio.run(test(funcMapping)) diff --git a/runner/mail.py b/runner/mail.py deleted file mode 100644 index 7796f69..0000000 --- a/runner/mail.py +++ /dev/null @@ -1,18 +0,0 @@ -import requests as req -from websockets.asyncio import client as WSC -import asyncio - -async def test(func): - counter=0 - async with WSC.connect("ws://0.0.0.0:7732/reg", open_timeout=None, ping_interval=10, ping_timeout=None ) as w: - while True: - counter+=1 - x=await w.recv() - await asyncio.sleep(1) - func() - print(x) - await w.send(str(counter), text=True) - - - -asyncio.run(test(print))