restructure, properly working

This commit is contained in:
2025-03-01 16:02:57 +05:30
parent c39726c098
commit 5dfc478eda
11 changed files with 58 additions and 89 deletions

View File

@@ -1,51 +0,0 @@
"""
Task queue manager class, courtesy of Shashwat Kumar
https://medium.com/@shashwat_ds/a-tiny-multi-threaded-job-queue-in-30-lines-of-python-a344c3f3f7f0
Later modified by Phani Pavan K to meet PEP8.1.
Stage 2 modifications include adding multiprocessing for scale up.
"""
import queue
from threading import Thread
from multiprocessing import Process
import time
class TaskQueue(queue.Queue):
def __init__(self, num_workers=1):
queue.Queue.__init__(self)
self.num_workers = num_workers
self.startWorkers()
def addTask(self, task, *args, **kwargs):
args = args or ()
kwargs = kwargs or {}
self.put((task, args, kwargs))
def startWorkers(self):
for _ in range(self.num_workers):
t = Thread(target=self.worker)
t.daemon = True
t.start()
def worker(self):
while True:
item, args, kwargs = self.get()
p=Process(target=item, args=args, kwargs=kwargs)
p.start()
p.join()
self.task_done()
if __name__ == "__main__":
testQueue = TaskQueue(4)
def printer(x):
for i in range(3):
print(x)
time.sleep(1)
for i in range(10):
testQueue.addTask(printer, f"Task {i}")
print(f"Added task {i}")
print("Done adding tasks to the queue")
testQueue.join()

View File

@@ -1,9 +0,0 @@
import requests as req
import callSpec
testObj = callSpec.CallPacket(procedure="arstarst", simpleObjects={"arst":"qwfpp"})
print(testObj.model_dump_json())
x=req.post("http://127.0.0.1:7732/cliReq", json=testObj.model_dump())
print(x.status_code)
print(x.text)

View File

@@ -0,0 +1,10 @@
from typing import Any, Dict
import pydantic
class CallPacket(pydantic.BaseModel):
procedure: str
data: Dict[str, Any] = {}
class ClientPacket(pydantic.BaseModel):
data: str

View File

@@ -1,12 +1,12 @@
import asyncio import asyncio
from typing import Dict import base64
from uvicorn import Config, Server from uvicorn import Config, Server
import fastapi import fastapi
from threading import Thread
from uvicorn.config import LOG_LEVELS from uvicorn.config import LOG_LEVELS
import pickle as pkl
from callSpec import CallPacket from callSpec import CallPacket, ClientPacket
class Broker: class Broker:
@@ -24,21 +24,25 @@ class Broker:
print("started") print("started")
while True: while True:
data = await self.taskQueue.get() data:CallPacket = await self.taskQueue.get()
# await asyncio.sleep(1) # await asyncio.sleep(1)
await wsConnection.send_text(data) await wsConnection.send_bytes(pkl.dumps(data))
print(await wsConnection.receive()) print(await wsConnection.receive())
print(f"left: {self.taskQueue.qsize()}") print(f"left: {self.taskQueue.qsize()}")
async def clientRequest(self, data:CallPacket): async def clientRequest(self, data:ClientPacket):
print(data) print(data)
callPacket = pkl.loads(base64.b64decode(data.data))
await self.taskQueue.put(callPacket)
print(self.taskQueue.qsize)
# await self.taskQueue.put(name) # await self.taskQueue.put(name)
def runBroker(): def runBroker():
br = Broker() br = Broker()
app = fastapi.FastAPI() app = fastapi.FastAPI()
app.include_router(br.router) app.include_router(br.router)
serverConf = Config(app = app, host="0.0.0.0", port=7732, log_level=LOG_LEVELS["trace"], ws_ping_interval=10, ws_ping_timeout=None) serverConf = Config(app = app, host="0.0.0.0", port=7732, log_level=LOG_LEVELS["debug"], ws_ping_interval=10, ws_ping_timeout=None)
server = Server(config=serverConf) server = Server(config=serverConf)
server.run() server.run()

View File

@@ -4,5 +4,4 @@ import pydantic
class CallPacket(pydantic.BaseModel): class CallPacket(pydantic.BaseModel):
procedure: str procedure: str
simpleObjects: Dict[str, Any] = {} data: Dict[str, Any ] = {}
byteObjects: Dict[str, bytes] = {}

12
harmoney/client/main.py Normal file
View File

@@ -0,0 +1,12 @@
import requests as req
import callSpec
import numpy as np
import pickle as pkl
import base64
testNPArray = np.ones((3,3,3))
testObj = callSpec.CallPacket(procedure="sum",data={"a":testNPArray} )
x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")})
testObj = callSpec.CallPacket(procedure="avg",data={"a":testNPArray} )
x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")})
print(x.status_code)
print(x.text)

View File

@@ -4,5 +4,4 @@ import pydantic
class CallPacket(pydantic.BaseModel): class CallPacket(pydantic.BaseModel):
procedure: str procedure: str
simpleObjects: Dict[str, Any] = {} data: Dict[str, Any ] = {}
byteObjects: Dict[str, bytes] = {}

23
harmoney/runner/mail.py Normal file
View File

@@ -0,0 +1,23 @@
from websockets.asyncio import client as WSC
import asyncio
import pickle as pkl
import numpy as np
from callSpec import CallPacket
async def test(funcMap):
counter=0
async with WSC.connect("ws://0.0.0.0:7732/reg", open_timeout=None, ping_interval=10, ping_timeout=None ) as w:
while True:
counter+=1
packetBytes=await w.recv()
callPk:CallPacket = pkl.loads(packetBytes)
# await asyncio.sleep(1)
print(callPk)
print(funcMap[callPk.procedure](**callPk.data))
await w.send(str(counter), text=True)
funcMapping = {"sum": np.sum, "avg": np.average}
asyncio.run(test(funcMapping))

View File

@@ -1,18 +0,0 @@
import requests as req
from websockets.asyncio import client as WSC
import asyncio
async def test(func):
counter=0
async with WSC.connect("ws://0.0.0.0:7732/reg", open_timeout=None, ping_interval=10, ping_timeout=None ) as w:
while True:
counter+=1
x=await w.recv()
await asyncio.sleep(1)
func()
print(x)
await w.send(str(counter), text=True)
asyncio.run(test(print))