diff --git a/broker/__pycache__/callSpec.cpython-311.pyc b/broker/__pycache__/callSpec.cpython-311.pyc new file mode 100644 index 0000000..5b374e9 Binary files /dev/null and b/broker/__pycache__/callSpec.cpython-311.pyc differ diff --git a/broker/callSpec.py b/broker/callSpec.py new file mode 100644 index 0000000..419c8f3 --- /dev/null +++ b/broker/callSpec.py @@ -0,0 +1,8 @@ +from typing import Any, Dict + +import pydantic + +class CallPacket(pydantic.BaseModel): + procedure: str + simpleObjects: Dict[str, Any] = {} + byteObjects: Dict[str, bytes] = {} diff --git a/broker/main.py b/broker/main.py new file mode 100644 index 0000000..83690ba --- /dev/null +++ b/broker/main.py @@ -0,0 +1,45 @@ +import asyncio +from typing import Dict +from uvicorn import Config, Server +import fastapi +from threading import Thread + +from uvicorn.config import LOG_LEVELS + +from callSpec import CallPacket + + +class Broker: + def __init__(self) -> None: + self.runnerQueue = [] + self.router = fastapi.APIRouter() + self.router.add_api_websocket_route("/reg", self.registerRunner) + self.router.add_api_route("/cliReq", self.clientRequest, methods=["POST"]) + self.taskQueue = asyncio.Queue() + + + async def registerRunner(self, wsConnection: fastapi.WebSocket): + + await wsConnection.accept() + + print("started") + while True: + data = await self.taskQueue.get() + # await asyncio.sleep(1) + await wsConnection.send_text(data) + print(await wsConnection.receive()) + print(f"left: {self.taskQueue.qsize()}") + + async def clientRequest(self, data:CallPacket): + print(data) + # await self.taskQueue.put(name) + +def runBroker(): + br = Broker() + app = fastapi.FastAPI() + app.include_router(br.router) + serverConf = Config(app = app, host="0.0.0.0", port=7732, log_level=LOG_LEVELS["trace"], ws_ping_interval=10, ws_ping_timeout=None) + server = Server(config=serverConf) + server.run() + +runBroker() diff --git a/broker/taskQueue.py b/broker/taskQueue.py new file mode 100644 index 0000000..4258408 --- /dev/null +++ b/broker/taskQueue.py @@ -0,0 +1,51 @@ +""" +Task queue manager class, courtesy of Shashwat Kumar +https://medium.com/@shashwat_ds/a-tiny-multi-threaded-job-queue-in-30-lines-of-python-a344c3f3f7f0 + +Later modified by Phani Pavan K to meet PEP8.1. +Stage 2 modifications include adding multiprocessing for scale up. +""" + +import queue +from threading import Thread +from multiprocessing import Process +import time + +class TaskQueue(queue.Queue): + def __init__(self, num_workers=1): + queue.Queue.__init__(self) + self.num_workers = num_workers + self.startWorkers() + + def addTask(self, task, *args, **kwargs): + args = args or () + kwargs = kwargs or {} + self.put((task, args, kwargs)) + + def startWorkers(self): + for _ in range(self.num_workers): + t = Thread(target=self.worker) + t.daemon = True + t.start() + + def worker(self): + while True: + item, args, kwargs = self.get() + p=Process(target=item, args=args, kwargs=kwargs) + p.start() + p.join() + self.task_done() + +if __name__ == "__main__": + testQueue = TaskQueue(4) + + def printer(x): + for i in range(3): + print(x) + time.sleep(1) + + for i in range(10): + testQueue.addTask(printer, f"Task {i}") + print(f"Added task {i}") + print("Done adding tasks to the queue") + testQueue.join() diff --git a/client/__pycache__/callSpec.cpython-313.pyc b/client/__pycache__/callSpec.cpython-313.pyc new file mode 100644 index 0000000..fd55df1 Binary files /dev/null and b/client/__pycache__/callSpec.cpython-313.pyc differ diff --git a/client/callSpec.py b/client/callSpec.py new file mode 100644 index 0000000..419c8f3 --- /dev/null +++ b/client/callSpec.py @@ -0,0 +1,8 @@ +from typing import Any, Dict + +import pydantic + +class CallPacket(pydantic.BaseModel): + procedure: str + simpleObjects: Dict[str, Any] = {} + byteObjects: Dict[str, bytes] = {} diff --git a/client/main.py b/client/main.py new file mode 100644 index 0000000..8f53247 --- /dev/null +++ b/client/main.py @@ -0,0 +1,9 @@ +import requests as req +import callSpec + + +testObj = callSpec.CallPacket(procedure="arstarst", simpleObjects={"arst":"qwfpp"}) +print(testObj.model_dump_json()) +x=req.post("http://127.0.0.1:7732/cliReq", json=testObj.model_dump()) +print(x.status_code) +print(x.text) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..faaadf3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +msgspec +fastapi diff --git a/runner/mail.py b/runner/mail.py new file mode 100644 index 0000000..7796f69 --- /dev/null +++ b/runner/mail.py @@ -0,0 +1,18 @@ +import requests as req +from websockets.asyncio import client as WSC +import asyncio + +async def test(func): + counter=0 + async with WSC.connect("ws://0.0.0.0:7732/reg", open_timeout=None, ping_interval=10, ping_timeout=None ) as w: + while True: + counter+=1 + x=await w.recv() + await asyncio.sleep(1) + func() + print(x) + await w.send(str(counter), text=True) + + + +asyncio.run(test(print))