package code

This commit is contained in:
2025-03-01 19:08:10 +05:30
parent 5dfc478eda
commit a1ef1d53d6
8 changed files with 71 additions and 43 deletions

5
harmoney/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
from . import client
from . import runner
from . import broker
__all__ = ["client", "runner", "broker"]

View File

@@ -0,0 +1,3 @@
from .main import runBroker
__all__ = ["runBroker"]

View File

@@ -2,34 +2,33 @@ import asyncio
import base64 import base64
from uvicorn import Config, Server from uvicorn import Config, Server
import fastapi import fastapi
from uvicorn.config import LOG_LEVELS from uvicorn.config import LOG_LEVELS
import pickle as pkl import pickle as pkl
from callSpec import CallPacket, ClientPacket from callSpec import CallPacket, ClientPacket
__all__ = ["runBroker"]
class Broker: class Broker:
def __init__(self) -> None: def __init__(self) -> None:
self.runnerQueue = []
self.router = fastapi.APIRouter() self.router = fastapi.APIRouter()
self.router.add_api_websocket_route("/reg", self.registerRunner) self.router.add_api_websocket_route("/reg", self.registerRunner)
self.router.add_api_route("/cliReq", self.clientRequest, methods=["POST"]) self.router.add_api_route("/cliReq", self.clientRequest, methods=["POST"])
self.taskQueue = asyncio.Queue() self.taskQueue = asyncio.Queue()
self.runnerCount=0
async def registerRunner(self, wsConnection: fastapi.WebSocket): async def registerRunner(self, wsConnection: fastapi.WebSocket):
await wsConnection.accept() await wsConnection.accept()
await wsConnection.send_text(str(self.runnerCount))
print("started") methods=await wsConnection.receive_json()
print(f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}")
self.runnerCount+=1
while True: while True:
data:CallPacket = await self.taskQueue.get() data:CallPacket = await self.taskQueue.get()
# await asyncio.sleep(1) # await asyncio.sleep(1)
await wsConnection.send_bytes(pkl.dumps(data)) await wsConnection.send_bytes(pkl.dumps(data))
retValue = await wsConnection.receive()
print(await wsConnection.receive()) print(f"Tasks left: {self.taskQueue.qsize()}")
print(f"left: {self.taskQueue.qsize()}")
async def clientRequest(self, data:ClientPacket): async def clientRequest(self, data:ClientPacket):
print(data) print(data)
@@ -38,12 +37,12 @@ class Broker:
print(self.taskQueue.qsize) print(self.taskQueue.qsize)
# await self.taskQueue.put(name) # await self.taskQueue.put(name)
def runBroker(): def runBroker(host, port):
br = Broker() br = Broker()
app = fastapi.FastAPI() app = fastapi.FastAPI()
app.include_router(br.router) app.include_router(br.router)
serverConf = Config(app = app, host="0.0.0.0", port=7732, log_level=LOG_LEVELS["debug"], ws_ping_interval=10, ws_ping_timeout=None) serverConf = Config(app = app, host=host, port=port, log_level=LOG_LEVELS["warning"], ws_ping_interval=10, ws_ping_timeout=None)
server = Server(config=serverConf) server = Server(config=serverConf)
server.run() server.run()
runBroker() # runBroker()

View File

@@ -0,0 +1,3 @@
from .main import Client
__all__ = ["Client"]

View File

@@ -1,12 +1,23 @@
import requests as req import requests as req
import callSpec import callSpec
import numpy as np
import pickle as pkl import pickle as pkl
import base64 import base64
testNPArray = np.ones((3,3,3)) # testNPArray = np.random.random((3,3,3, 3))
testObj = callSpec.CallPacket(procedure="sum",data={"a":testNPArray} ) # testObj = callSpec.CallPacket(procedure="sum",data={"a":testNPArray} )
x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")}) # x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")})
testObj = callSpec.CallPacket(procedure="avg",data={"a":testNPArray} ) # testObj = callSpec.CallPacket(procedure="avg",data={"a":testNPArray} )
x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")}) # x=req.post("http://127.0.0.1:7732/cliReq", json={"data":base64.b64encode(pkl.dumps(testObj)).decode("utf-8")})
print(x.status_code) # print(x.status_code)
print(x.text) # print(x.text)
class Client:
def __init__(self, host, port) -> None:
self.url = f"http://{host}:{port}/cliReq"
def rCall(self, function, **kwargs):
callPacket = callSpec.CallPacket(procedure=function, data=kwargs)
payload = {"data": base64.b64encode(pkl.dumps(callPacket)).decode("utf-8")}
resp = req.post(self.url, json=payload)
print(resp.status_code)
print(resp.text)
return resp

View File

@@ -0,0 +1,3 @@
from .main import startRunner
__all__ = ["startRunner"]

View File

@@ -1,23 +0,0 @@
from websockets.asyncio import client as WSC
import asyncio
import pickle as pkl
import numpy as np
from callSpec import CallPacket
async def test(funcMap):
counter=0
async with WSC.connect("ws://0.0.0.0:7732/reg", open_timeout=None, ping_interval=10, ping_timeout=None ) as w:
while True:
counter+=1
packetBytes=await w.recv()
callPk:CallPacket = pkl.loads(packetBytes)
# await asyncio.sleep(1)
print(callPk)
print(funcMap[callPk.procedure](**callPk.data))
await w.send(str(counter), text=True)
funcMapping = {"sum": np.sum, "avg": np.average}
asyncio.run(test(funcMapping))

27
harmoney/runner/main.py Normal file
View File

@@ -0,0 +1,27 @@
from typing import Any, Dict
from websockets.asyncio import client as WSC
import asyncio
import pickle as pkl
from callSpec import CallPacket
__all__ = ["startRunner"]
async def test(funcMap: Dict[str, Any], url):
counter=0
async with WSC.connect(url, open_timeout=None, ping_interval=10, ping_timeout=None ) as w:
id = await w.recv()
id = int(id)
print(f"Starting Runner, ID: {id}")
await w.send({"methods":list(funcMap.keys())})
while True:
counter+=1
packetBytes=await w.recv()
callPk:CallPacket = pkl.loads(packetBytes)
print("-"*50 + f"\nRunning: {callPk.procedure}\nArgs: {callPk.data}\n" + "-"*25)
funcOutput = funcMap[callPk.procedure](**callPk.data)
await w.send(str(counter))
def startRunner(funcMapping, host, port):
asyncio.run(test(funcMapping, f"ws://{host}:{port}/reg"))