I am trying to hack together a basic generic client interface from the websockets documentation. I'm using this example as a starting point.
The following code sends 10 strings in a loop but either none of them are coming back from the echo server, or they are not reaching the consumer method
I have tried replacing a method in the code below with the following ...
async def consumer_handler(self, websocket, path):
try:
logger.debug("Beginning consumer")
while True:
try:
message = await asyncio.wait_for(websocket.recv(), timeout=1.0)
logger.debug(f"Recieved message {message}")
await self.consumer(message)
except asyncio.TimeoutError:
logger.debug("Consumer timeout")
finally:
logger.debug("Ended consumer")
... but that doesnt make any difference. Further, neither the "Consumer Timeout" or "Received Message" logs are printed, so it seems like this loop is not running after the producer_handler is started (consumer_handler does get started as the "Beginning consumer" string is logged). I also note that the "Ended consumer" and "Ended producer" strings are never logged.
import asyncio
import websockets
from queue import Queue
import logging
from functools import partial
from concurrent.futures import ThreadPoolExecutor
import time
logging.basicConfig(format='%(asctime)-15s %(threadName)s %(message)s')
logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
class Application:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=8)
def getExecutor(self):
return self.executor
def close(self):
self.executor.shutdown()
class Client:
###########################################################################
# Business Logic
def sendMessage(self, message):
self.messageQueue.put(message)
async def consumer(self, message):
logger.debug(f"Consumed message {message}")
###########################################################################
###########################################################################
# General
def __init__(self, app):
self.app = app
self.messageQueue = Queue()
self.ws = None
async def producer(self):
logger.debug("In producer")
return self.messageQueue.get()
def connect(self, uri):
self.app.getExecutor().submit(partial(self._connect, uri))
def _connect(self, uri):
asyncio.run(self.__connect(uri), debug=True)
async def __connect(self, uri):
logger.debug("Connecting")
self.ws = await websockets.connect(uri)
await self.handler(self.ws, uri)
def close(self):
self.app.getExecutor().submit(self._close)
def _close(self):
asyncio.run(self.__close(), debug=True)
async def __close(self):
logger.debug("Ending client")
await self.ws.close()
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# remaining part of class is from
# https://websockets.readthedocs.io/en/stable/intro.html#both
async def consumer_handler(self, websocket, path):
try:
logger.debug("Beginning consumer")
async for message in websocket:
logger.debug(f"Recieved message {message}")
await self.consumer(message)
finally:
logger.debug("Ended consumer")
async def producer_handler(self, websocket, path):
logger.debug("Beginning producer")
try:
while True:
message = await self.producer()
await websocket.send(message)
logger.debug(f"Sent: {message}")
finally:
logger.debug("Ended producer")
async def handler(self, websocket, path):
consumer_task = asyncio.create_task(
self.consumer_handler(websocket, path))
producer_task = asyncio.create_task(
self.producer_handler(websocket, path))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
def main():
ADDRESS = "ws://echo.websocket.org/"
logger.debug("Beginning client")
app = Application()
try:
client = Client(app)
try:
client.connect(ADDRESS)
time.sleep(2)
for i in range(10):
client.sendMessage(f"Hello {i}")
time.sleep(0.5)
time.sleep(3)
finally:
client.close()
time.sleep(2)
finally:
app.close()
if __name__ == '__main__':
main()