initial commit

This commit is contained in:
2025-02-28 17:40:39 +05:30
parent 796e995b2b
commit c39726c098
9 changed files with 141 additions and 0 deletions

Binary file not shown.

8
broker/callSpec.py Normal file
View File

@@ -0,0 +1,8 @@
from typing import Any, Dict
import pydantic
class CallPacket(pydantic.BaseModel):
procedure: str
simpleObjects: Dict[str, Any] = {}
byteObjects: Dict[str, bytes] = {}

45
broker/main.py Normal file
View File

@@ -0,0 +1,45 @@
import asyncio
from typing import Dict
from uvicorn import Config, Server
import fastapi
from threading import Thread
from uvicorn.config import LOG_LEVELS
from callSpec import CallPacket
class Broker:
def __init__(self) -> None:
self.runnerQueue = []
self.router = fastapi.APIRouter()
self.router.add_api_websocket_route("/reg", self.registerRunner)
self.router.add_api_route("/cliReq", self.clientRequest, methods=["POST"])
self.taskQueue = asyncio.Queue()
async def registerRunner(self, wsConnection: fastapi.WebSocket):
await wsConnection.accept()
print("started")
while True:
data = await self.taskQueue.get()
# await asyncio.sleep(1)
await wsConnection.send_text(data)
print(await wsConnection.receive())
print(f"left: {self.taskQueue.qsize()}")
async def clientRequest(self, data:CallPacket):
print(data)
# await self.taskQueue.put(name)
def runBroker():
br = Broker()
app = fastapi.FastAPI()
app.include_router(br.router)
serverConf = Config(app = app, host="0.0.0.0", port=7732, log_level=LOG_LEVELS["trace"], ws_ping_interval=10, ws_ping_timeout=None)
server = Server(config=serverConf)
server.run()
runBroker()

51
broker/taskQueue.py Normal file
View File

@@ -0,0 +1,51 @@
"""
Task queue manager class, courtesy of Shashwat Kumar
https://medium.com/@shashwat_ds/a-tiny-multi-threaded-job-queue-in-30-lines-of-python-a344c3f3f7f0
Later modified by Phani Pavan K to meet PEP8.1.
Stage 2 modifications include adding multiprocessing for scale up.
"""
import queue
from threading import Thread
from multiprocessing import Process
import time
class TaskQueue(queue.Queue):
def __init__(self, num_workers=1):
queue.Queue.__init__(self)
self.num_workers = num_workers
self.startWorkers()
def addTask(self, task, *args, **kwargs):
args = args or ()
kwargs = kwargs or {}
self.put((task, args, kwargs))
def startWorkers(self):
for _ in range(self.num_workers):
t = Thread(target=self.worker)
t.daemon = True
t.start()
def worker(self):
while True:
item, args, kwargs = self.get()
p=Process(target=item, args=args, kwargs=kwargs)
p.start()
p.join()
self.task_done()
if __name__ == "__main__":
testQueue = TaskQueue(4)
def printer(x):
for i in range(3):
print(x)
time.sleep(1)
for i in range(10):
testQueue.addTask(printer, f"Task {i}")
print(f"Added task {i}")
print("Done adding tasks to the queue")
testQueue.join()