Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0f6cc3aeaf | |||
| 2d601d5ce1 | |||
| fc2e4179c4 | |||
|
|
f2cc63828a | ||
| 0b8006467f |
46
README.md
46
README.md
@@ -9,54 +9,74 @@ Distributed Function Caller Framework for Python
|
|||||||
Dependencies:
|
Dependencies:
|
||||||
- websockets
|
- websockets
|
||||||
- fastapi
|
- fastapi
|
||||||
- requests
|
|
||||||
- uvicorn
|
- uvicorn
|
||||||
- pydantic
|
- pydantic
|
||||||
|
|
||||||
## Usage:
|
## Usage:
|
||||||
|
|
||||||
Requires 3 scripts: Client, Broker and Runner
|
Requires 3 scripts: Client, Router and Runner
|
||||||
|
|
||||||
- Broker will mediate load balancing and connection handling, so this should start first. One port should be open.
|
### 1. Router
|
||||||
|
|
||||||
Let broker's IP be `192.168.0.110` and port be `7732`
|
The Router will mediate load balancing and connection handling between the function runners and callers, so this should start first. One port must to facilitate the communication.
|
||||||
|
|
||||||
|
This example starts the router on IP `192.168.0.110` and port `7732`.
|
||||||
```python
|
```python
|
||||||
|
|
||||||
from harmoney import router as rou
|
from harmoney.router import startRouter
|
||||||
|
|
||||||
ro.startRouter("0.0.0.0", 7732)
|
startRouter("0.0.0.0", 7732)
|
||||||
```
|
```
|
||||||
|
|
||||||
- Runner performs the calculations, should contain function definitions. Connects to broker using broker's IP.
|
### 2. Runner
|
||||||
|
|
||||||
|
Runner exposes the function calls and contains all the heavy compute logic. Below code connects one instance of runner to the router.
|
||||||
|
|
||||||
```python
|
```python
|
||||||
|
|
||||||
from harmoney import runners as run
|
from harmoney.runner import startRunner
|
||||||
|
|
||||||
def customFunction(arg1: int, arg2: str) -> str:
|
def customFunction(arg1: int, arg2: str) -> str:
|
||||||
return arg2*arg1
|
return arg2*arg1
|
||||||
|
|
||||||
funcs = {"custFn": customFunction}
|
funcs = {"custFn": customFunction}
|
||||||
|
|
||||||
run.startRunner(funcs, "192.168.0.110", 7732)
|
startRunner(funcs, "192.168.0.110", 7732)
|
||||||
```
|
```
|
||||||
|
|
||||||
- Client is the main caller of functions. Will contain your main code.
|
Functions must return objects that are pickleable. `None` or no return statement is valid.
|
||||||
|
|
||||||
|
### 3. Client
|
||||||
|
|
||||||
|
Client requests the function - argument combinations to run on runners.
|
||||||
|
|
||||||
```python
|
```python
|
||||||
|
|
||||||
from harmoney import client as cli
|
from harmoney.client import Client
|
||||||
|
|
||||||
cli.Client("192.168.0.110", 7732)
|
cli = Client("192.168.0.110", 7732)
|
||||||
|
|
||||||
retVal = cli.runSingle("custFn", arg1=10, arg2="arst")
|
retVal = cli.runSingle("custFn", arg1=10, arg2="arst") # run only this combination
|
||||||
|
|
||||||
print(retVal)
|
print(retVal)
|
||||||
|
|
||||||
|
cli.addCall("custFn", arg1=123, arg2="qwf") # add this combination to queue
|
||||||
|
cli.addCall("custFn", arg1=321, arg2="ars") # add this combination to queue
|
||||||
|
res = cli.runAllCalls() # send queued combs at a time.
|
||||||
|
print(res) # Print all the results
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Return values from `runALlCalls` are in the order of their function combination in the queue.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
TODO:
|
TODO:
|
||||||
- [ ] Error catching, keeping the connection to the broker
|
- [ ] Error catching, keeping the connection to the broker
|
||||||
- [ ] Error info should return to the client
|
- [ ] Error info should return to the client
|
||||||
- [ ] Remove dependency on fastapi and requests, move to completely to websockets
|
- [ ] Remove dependency on fastapi and requests, move to completely to websockets
|
||||||
|
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
~ A Grammer Society Project.
|
||||||
4
harmoney.md
Normal file
4
harmoney.md
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
- run multiple processes for given definition automatically
|
||||||
|
- add debug flags for print statements
|
||||||
|
- add string method for call packet, for pretty printing
|
||||||
|
- remove disconnected servers from router
|
||||||
3
harmoney/client/__init__.py
Normal file
3
harmoney/client/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
from .client import Client
|
||||||
|
|
||||||
|
__all__ = ["Client"]
|
||||||
@@ -1,44 +1,60 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from ._callSpec import _CallPacket
|
|
||||||
import pickle as pkl
|
import pickle as pkl
|
||||||
|
|
||||||
from websockets.asyncio import client as WSC
|
from websockets.asyncio import client as WSC
|
||||||
|
|
||||||
|
from .._callSpec import _CallPacket
|
||||||
|
|
||||||
__all__ = ["Client"]
|
__all__ = ["Client"]
|
||||||
|
|
||||||
|
|
||||||
class Client:
|
class Client:
|
||||||
def __init__(self, host, port) -> None:
|
def __init__(self, host: str, port: str | int) -> None:
|
||||||
self._wsURL = f"ws://{host}:{port}/cliReq/"
|
self._wsURL = f"ws://{host}:{port}/cliReq/"
|
||||||
self.tasks = []
|
self.tasks = []
|
||||||
|
|
||||||
def singleCall(self, function, **kwargs):
|
def singleCall(self, function: str, **kwargs):
|
||||||
"""
|
"""
|
||||||
Performs single call with the provided function and args
|
Performs single call with the provided function and args
|
||||||
"""
|
"""
|
||||||
self.addCall(_CallPacket(procedure=function, data=kwargs))
|
self.addCall(function, **kwargs)
|
||||||
return self.runAllCalls()[0]
|
return self.runAllCalls()[0]
|
||||||
|
|
||||||
def addCall(self, function, **kwargs):
|
def addCall(self, function: str, **kwargs) -> int:
|
||||||
"""
|
"""
|
||||||
Adds a task to call queue.
|
Adds a task to call queue.
|
||||||
|
Returns current length of call queue.
|
||||||
"""
|
"""
|
||||||
self.tasks.append((_CallPacket(procedure=function, data=kwargs)))
|
self.tasks.append((_CallPacket(procedure=function, data=kwargs)))
|
||||||
|
return len(self.tasks)
|
||||||
|
|
||||||
async def _runAllCalls(self, callDelay=0.01):
|
async def _runAllCalls(self) -> list:
|
||||||
"""
|
"""
|
||||||
Logic function to communicate with the router.
|
Logic function to communicate with the router.
|
||||||
"""
|
"""
|
||||||
print(f"Total Calls: {len(self.tasks)}")
|
if len(self.tasks) == 0:
|
||||||
async with WSC.connect(self._wsURL+f"{len(self.tasks)}", open_timeout=None, ping_interval=10, ping_timeout=None) as ws:
|
return []
|
||||||
|
print(f"Running {len(self.tasks)} functions")
|
||||||
|
async with WSC.connect(
|
||||||
|
self._wsURL + f"{len(self.tasks)}",
|
||||||
|
open_timeout=None,
|
||||||
|
ping_interval=10,
|
||||||
|
ping_timeout=None,
|
||||||
|
) as ws:
|
||||||
ackCount = int(await ws.recv())
|
ackCount = int(await ws.recv())
|
||||||
assert ackCount == len(self.tasks), "Comms not proper..."
|
assert ackCount == len(self.tasks), "Comms not proper..."
|
||||||
await ws.send(pkl.dumps(self.tasks))
|
await ws.send(pkl.dumps(self.tasks))
|
||||||
returnData = await ws.recv()
|
returnData = await ws.recv()
|
||||||
returnData = pkl.loads(returnData)
|
returnData = pkl.loads(returnData)
|
||||||
self.tasks=[]
|
self.tasks.clear()
|
||||||
return returnData
|
return returnData
|
||||||
|
|
||||||
def runAllCalls(self, callDelay=0.01):
|
def runAllCalls(self):
|
||||||
"""
|
"""
|
||||||
User facing function to remotely execute queued tasks.
|
User facing function to remotely execute queued tasks.
|
||||||
"""
|
"""
|
||||||
return asyncio.run(self._runAllCalls(callDelay=callDelay))
|
return asyncio.run(self._runAllCalls())
|
||||||
|
|
||||||
|
def clearQueue(self) -> bool:
|
||||||
|
self.tasks.clear()
|
||||||
|
return True
|
||||||
@@ -1,107 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import base64
|
|
||||||
from threading import Thread
|
|
||||||
from typing import List
|
|
||||||
from uvicorn import Config, Server
|
|
||||||
import fastapi
|
|
||||||
from uvicorn.config import LOG_LEVELS
|
|
||||||
import pickle as pkl
|
|
||||||
import uuid
|
|
||||||
from ._callSpec import _ClientPacket, _CallPacket
|
|
||||||
|
|
||||||
__all__ = ["startRouter"]
|
|
||||||
|
|
||||||
class _Router:
|
|
||||||
def __init__(self, pollingDelay=0.5) -> None:
|
|
||||||
self.router = fastapi.APIRouter()
|
|
||||||
self.router.add_api_websocket_route("/reg", self.registerRunner)
|
|
||||||
self.router.add_api_websocket_route("/cliReq/{count}", self.multiClientRequest)
|
|
||||||
self.taskQueue = asyncio.Queue()
|
|
||||||
self.runnerCount=0
|
|
||||||
self.returnDict = {}
|
|
||||||
self.doneDict = {}
|
|
||||||
self.pollingDelay = pollingDelay
|
|
||||||
|
|
||||||
async def registerRunner(self, wsConnection: fastapi.WebSocket):
|
|
||||||
"""
|
|
||||||
Method which queries an available task and sends the data to the attached runner.
|
|
||||||
"""
|
|
||||||
await wsConnection.accept()
|
|
||||||
await wsConnection.send_text(str(self.runnerCount))
|
|
||||||
methods=await wsConnection.receive()
|
|
||||||
methods = pkl.loads(base64.b64decode(methods["text"]))
|
|
||||||
print(f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}")
|
|
||||||
runnerID=self.runnerCount
|
|
||||||
self.runnerCount+=1
|
|
||||||
runnerCounter = 0
|
|
||||||
while True:
|
|
||||||
reqID, data = await self.taskQueue.get()
|
|
||||||
runnerCounter+=1
|
|
||||||
print(f"Runr {runnerID} Counter: {runnerCounter}")
|
|
||||||
await wsConnection.send_bytes(pkl.dumps(data))
|
|
||||||
retValue = await wsConnection.receive()
|
|
||||||
self.returnDict[reqID] = pkl.loads(base64.b64decode(retValue["bytes"]))
|
|
||||||
|
|
||||||
async def clientRequest(self, data:_ClientPacket):
|
|
||||||
"""
|
|
||||||
Method to handle single request, adds the task to queue and awaits for result.
|
|
||||||
To be deprecated for better task handling.
|
|
||||||
"""
|
|
||||||
reqID = uuid.uuid4().hex
|
|
||||||
callPacket = data
|
|
||||||
await self.taskQueue.put((reqID, callPacket))
|
|
||||||
while reqID not in self.returnDict:
|
|
||||||
await asyncio.sleep(self.pollingDelay)
|
|
||||||
returnValue = self.returnDict[reqID]
|
|
||||||
self.returnDict.pop(reqID)
|
|
||||||
return returnValue
|
|
||||||
|
|
||||||
async def multiClientRequest(self, wsConn:fastapi.WebSocket, count:int):
|
|
||||||
"""
|
|
||||||
Method accepts a task list and adds them to the queue.
|
|
||||||
Returns the results to client.
|
|
||||||
"""
|
|
||||||
await wsConn.accept()
|
|
||||||
softLimit=50
|
|
||||||
await wsConn.send_text(str(count))
|
|
||||||
reqID = uuid.uuid4().hex
|
|
||||||
self.returnDict[reqID] = [0]*count
|
|
||||||
self.doneDict[reqID] = [0]*count
|
|
||||||
print(f"Received {count} tasks")
|
|
||||||
taskBytes = await wsConn.receive_bytes()
|
|
||||||
taskPackets = pkl.loads(taskBytes)
|
|
||||||
softLimitItr = 0
|
|
||||||
for task in range(len(taskPackets)):
|
|
||||||
while (task > (softLimitItr+softLimit)) and not self.doneDict[reqID][softLimitItr]==1:
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
if self.doneDict[reqID][softLimitItr]==1:
|
|
||||||
softLimitItr+=1
|
|
||||||
t=Thread(target=self._worker, args=(reqID, task, taskPackets[task]))
|
|
||||||
t.daemon=True
|
|
||||||
t.start()
|
|
||||||
while not all(self.doneDict[reqID]):
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
await wsConn.send_bytes(pkl.dumps(self.returnDict[reqID]))
|
|
||||||
self.returnDict.pop(reqID)
|
|
||||||
|
|
||||||
def _worker(self, id, idx, data:_ClientPacket):
|
|
||||||
"""
|
|
||||||
Thread worker to handle one task.
|
|
||||||
To be depricated for better task handling.
|
|
||||||
"""
|
|
||||||
retVal = asyncio.run(self.clientRequest(data))
|
|
||||||
self.returnDict[id][idx]=retVal
|
|
||||||
self.doneDict[id][idx]=1
|
|
||||||
return
|
|
||||||
|
|
||||||
def startRouter(host, port, pollingDelay=0.1, logLevel=3):
|
|
||||||
"""
|
|
||||||
Main function to start the router system.
|
|
||||||
"""
|
|
||||||
br = _Router(pollingDelay=pollingDelay)
|
|
||||||
app = fastapi.FastAPI()
|
|
||||||
app.include_router(br.router)
|
|
||||||
level = list(LOG_LEVELS.keys())[logLevel]
|
|
||||||
serverConf = Config(app = app, host=host, port=port, log_level=LOG_LEVELS[level], ws_ping_interval=10, ws_ping_timeout=None, ws_max_size=1024*1024*1024)
|
|
||||||
server = Server(config=serverConf)
|
|
||||||
server.run()
|
|
||||||
3
harmoney/router/__init__.py
Normal file
3
harmoney/router/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
from .router import startRouter
|
||||||
|
|
||||||
|
__all__ = ["startRouter"]
|
||||||
144
harmoney/router/router.py
Normal file
144
harmoney/router/router.py
Normal file
@@ -0,0 +1,144 @@
|
|||||||
|
import asyncio
|
||||||
|
import base64
|
||||||
|
import pickle as pkl
|
||||||
|
import uuid
|
||||||
|
from threading import Lock, Thread
|
||||||
|
|
||||||
|
import fastapi
|
||||||
|
from uvicorn import Config, Server
|
||||||
|
from uvicorn.config import LOG_LEVELS
|
||||||
|
|
||||||
|
from .._callSpec import _ClientPacket
|
||||||
|
|
||||||
|
__all__ = ["startRouter"]
|
||||||
|
|
||||||
|
|
||||||
|
class _Router:
|
||||||
|
def __init__(
|
||||||
|
self, pollingDelay: float | int = 0.5, recycleOnError: bool = False
|
||||||
|
) -> None:
|
||||||
|
self.router = fastapi.APIRouter()
|
||||||
|
self.router.add_api_websocket_route("/reg", self.registerRunner)
|
||||||
|
self.router.add_api_websocket_route("/cliReq/{count}", self.multiClientRequest)
|
||||||
|
self.taskQueue = asyncio.Queue()
|
||||||
|
self.runnerCount = 0
|
||||||
|
self.returnDict = {}
|
||||||
|
self.doneDict = {}
|
||||||
|
self.pollingDelay = pollingDelay
|
||||||
|
self.recycleOnError = recycleOnError
|
||||||
|
|
||||||
|
async def registerRunner(self, wsConnection: fastapi.WebSocket):
|
||||||
|
"""
|
||||||
|
Method which queries an available task and sends the data to the attached runner.
|
||||||
|
"""
|
||||||
|
l = Lock()
|
||||||
|
l.acquire()
|
||||||
|
await wsConnection.accept()
|
||||||
|
await wsConnection.send_text(str(self.runnerCount))
|
||||||
|
methods = await wsConnection.receive()
|
||||||
|
methods = pkl.loads(base64.b64decode(methods["text"]))
|
||||||
|
print(
|
||||||
|
f"Runner Connected with ID: {self.runnerCount}, Methods: {methods['methods']}"
|
||||||
|
)
|
||||||
|
runnerID = self.runnerCount
|
||||||
|
self.runnerCount += 1
|
||||||
|
runnerCounter = 0
|
||||||
|
l.release()
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
# add this id back into available id pools using a shared list.
|
||||||
|
reqID, data = await self.taskQueue.get()
|
||||||
|
runnerCounter += 1
|
||||||
|
print(f"Runr {runnerID} Counter: {runnerCounter}")
|
||||||
|
await wsConnection.send_bytes(pkl.dumps(data))
|
||||||
|
retValue = await wsConnection.receive()
|
||||||
|
if "bytes" not in retValue:
|
||||||
|
self.returnDict[reqID] = data
|
||||||
|
raise Exception(
|
||||||
|
f"Runner {runnerID} Crashed!! Check for function error logs on the Runner."
|
||||||
|
)
|
||||||
|
|
||||||
|
self.returnDict[reqID] = pkl.loads(base64.b64decode(retValue["bytes"]))
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
if self.recycleOnError and "reqID" in locals() and "data" in locals():
|
||||||
|
await self.taskQueue.put((reqID, data))
|
||||||
|
print("Recycled this task")
|
||||||
|
# await wsConnection.close()
|
||||||
|
print(f"Runner {runnerID} Closed")
|
||||||
|
|
||||||
|
async def clientRequest(self, data: _ClientPacket):
|
||||||
|
"""
|
||||||
|
Method to handle single request, adds the task to queue and awaits for result.
|
||||||
|
To be deprecated for better task handling.
|
||||||
|
"""
|
||||||
|
reqID = uuid.uuid4().hex
|
||||||
|
callPacket = data
|
||||||
|
await self.taskQueue.put((reqID, callPacket))
|
||||||
|
while reqID not in self.returnDict:
|
||||||
|
await asyncio.sleep(self.pollingDelay)
|
||||||
|
returnValue = self.returnDict.pop(reqID)
|
||||||
|
return returnValue
|
||||||
|
|
||||||
|
async def multiClientRequest(self, wsConn: fastapi.WebSocket, count: int):
|
||||||
|
"""
|
||||||
|
Method accepts a task list and adds them to the queue.
|
||||||
|
Returns the results to client.
|
||||||
|
"""
|
||||||
|
await wsConn.accept()
|
||||||
|
softLimit = 50
|
||||||
|
await wsConn.send_text(str(count))
|
||||||
|
reqID = uuid.uuid4().hex
|
||||||
|
self.returnDict[reqID] = [0] * count
|
||||||
|
self.doneDict[reqID] = [0] * count
|
||||||
|
# print(f"Received {count} tasks")
|
||||||
|
taskBytes = await wsConn.receive_bytes()
|
||||||
|
taskPackets = pkl.loads(taskBytes)
|
||||||
|
softLimitItr = 0
|
||||||
|
for task in range(len(taskPackets)):
|
||||||
|
while (task > (softLimitItr + softLimit)) and not self.doneDict[reqID][
|
||||||
|
softLimitItr
|
||||||
|
] == 1:
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
if self.doneDict[reqID][softLimitItr] == 1:
|
||||||
|
softLimitItr += 1
|
||||||
|
t = Thread(target=self._worker, args=(reqID, task, taskPackets[task]))
|
||||||
|
t.daemon = True
|
||||||
|
t.start()
|
||||||
|
while not all(self.doneDict[reqID]):
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
await wsConn.send_bytes(pkl.dumps(self.returnDict[reqID]))
|
||||||
|
self.returnDict.pop(reqID)
|
||||||
|
|
||||||
|
def _worker(self, id, idx, data: _ClientPacket):
|
||||||
|
"""
|
||||||
|
Thread worker to handle one task.
|
||||||
|
To be depricated for better task handling.
|
||||||
|
"""
|
||||||
|
retVal = asyncio.run(self.clientRequest(data))
|
||||||
|
self.returnDict[id][idx] = retVal
|
||||||
|
self.doneDict[id][idx] = 1
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def startRouter(
|
||||||
|
host: str, port: str | int, pollingDelay: float | int = 0.1, logLevel: int = 3
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Main function to start the router system.
|
||||||
|
"""
|
||||||
|
br = _Router(pollingDelay=pollingDelay)
|
||||||
|
app = fastapi.FastAPI()
|
||||||
|
app.include_router(br.router)
|
||||||
|
level = list(LOG_LEVELS.keys())[logLevel]
|
||||||
|
serverConf = Config(
|
||||||
|
app=app,
|
||||||
|
host=host,
|
||||||
|
port=int(port),
|
||||||
|
log_level=LOG_LEVELS[level],
|
||||||
|
ws_ping_interval=10,
|
||||||
|
ws_ping_timeout=None,
|
||||||
|
ws_max_size=1024 * 1024 * 1024,
|
||||||
|
)
|
||||||
|
server = Server(config=serverConf)
|
||||||
|
server.run()
|
||||||
@@ -1,38 +0,0 @@
|
|||||||
import base64
|
|
||||||
from typing import Any, Dict
|
|
||||||
from websockets.asyncio import client as WSC
|
|
||||||
from websockets.exceptions import WebSocketException
|
|
||||||
import asyncio
|
|
||||||
import pickle as pkl
|
|
||||||
from ._callSpec import _CallPacket
|
|
||||||
|
|
||||||
__all__ = ["startRunner"]
|
|
||||||
|
|
||||||
async def _send(funcMap: Dict[str, Any], url):
|
|
||||||
"""
|
|
||||||
Main logic funcion, connects to the router, takes the incoming task, executes and returns the result.
|
|
||||||
To improve error handling from the mapped function side.
|
|
||||||
"""
|
|
||||||
counter=0
|
|
||||||
async with WSC.connect(url, open_timeout=None, ping_interval=10, ping_timeout=None ) as w:
|
|
||||||
try:
|
|
||||||
id = await w.recv()
|
|
||||||
id = int(id)
|
|
||||||
print(f"Starting Runner, ID: {id}")
|
|
||||||
await w.send(base64.b64encode(pkl.dumps({"methods":list(funcMap.keys())})).decode("utf-8"))
|
|
||||||
while True:
|
|
||||||
packetBytes=await w.recv()
|
|
||||||
counter+=1
|
|
||||||
callPk:_CallPacket = pkl.loads(packetBytes)
|
|
||||||
print("-"*50 + f"\nRunning: {callPk.procedure}\nArgs: {callPk.data}\nCounter: {counter}\n" + "-"*50)
|
|
||||||
funcOutput = funcMap[callPk.procedure](**callPk.data)
|
|
||||||
await w.send(base64.b64encode(pkl.dumps(funcOutput)))
|
|
||||||
except WebSocketException as e:
|
|
||||||
print(f"Closing Conncetion with Broker, total call count: {counter}")
|
|
||||||
await w.close()
|
|
||||||
|
|
||||||
def startRunner(funcMapping, host, port):
|
|
||||||
"""
|
|
||||||
Main function to call from the user code.
|
|
||||||
"""
|
|
||||||
asyncio.run(_send(funcMapping, f"ws://{host}:{port}/reg"))
|
|
||||||
3
harmoney/runner/__init__.py
Normal file
3
harmoney/runner/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
from .runner import startRunner
|
||||||
|
|
||||||
|
__all__ = ["startRunner"]
|
||||||
59
harmoney/runner/runner.py
Normal file
59
harmoney/runner/runner.py
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
import asyncio
|
||||||
|
import base64
|
||||||
|
import pickle as pkl
|
||||||
|
from typing import Any, Dict
|
||||||
|
|
||||||
|
from websockets.asyncio import client as WSC
|
||||||
|
from websockets.exceptions import WebSocketException
|
||||||
|
|
||||||
|
from .._callSpec import _CallPacket
|
||||||
|
|
||||||
|
__all__ = ["startRunner"]
|
||||||
|
|
||||||
|
|
||||||
|
async def _loop(funcMap: Dict[str, Any], url: str, debug=False):
|
||||||
|
"""
|
||||||
|
Main logic funcion, connects to the router, takes the incoming task, executes and returns the result.
|
||||||
|
To improve error handling from the mapped function side.
|
||||||
|
"""
|
||||||
|
counter = 0
|
||||||
|
async with WSC.connect(
|
||||||
|
url, open_timeout=None, ping_interval=10, ping_timeout=None
|
||||||
|
) as w:
|
||||||
|
try:
|
||||||
|
id = await w.recv()
|
||||||
|
id = int(id)
|
||||||
|
print(f"Starting Runner with ID: {id}")
|
||||||
|
await w.send(
|
||||||
|
base64.b64encode(pkl.dumps({"methods": list(funcMap.keys())})).decode(
|
||||||
|
"utf-8"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
while True:
|
||||||
|
packetBytes = await w.recv()
|
||||||
|
counter += 1
|
||||||
|
callPk: _CallPacket = pkl.loads(packetBytes)
|
||||||
|
if debug:
|
||||||
|
print(
|
||||||
|
"-" * 50
|
||||||
|
+ f"\nRunning: {callPk.procedure}\nCounter: {counter}\nArgs: {str(callPk.data)[:30]}{'...' if len(str(callPk.data)) >= 30 else ''}\n"
|
||||||
|
+ "-" * 50
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
print("-" * 30)
|
||||||
|
funcOutput = funcMap[callPk.procedure](**callPk.data)
|
||||||
|
await w.send(base64.b64encode(pkl.dumps(funcOutput)))
|
||||||
|
except WebSocketException:
|
||||||
|
print(f"Connection closed with Router, total call count: {counter}")
|
||||||
|
await w.close()
|
||||||
|
|
||||||
|
|
||||||
|
def startRunner(funcMapping: Dict, host: str, port: str | int, debug: bool = False):
|
||||||
|
"""
|
||||||
|
Main function to call from the user code.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
port = int(str(port))
|
||||||
|
except:
|
||||||
|
raise TypeError(f"Port {port} not valid.")
|
||||||
|
asyncio.run(_loop(funcMapping, f"ws://{host}:{port}/reg", debug))
|
||||||
Reference in New Issue
Block a user