1

I have written a Django application to receive updates of long running tasks. I am using Redis as a channel layer for web-sockets and also making use of Redis Streams to store the updates of the tasks that are currently running. I have spawned a thread from connect method of WebSocket Consumer to run an infinite loop to continuously check for updates from Redis Consumer. This seems to be a hackish solution for what I am trying to achieve i.e. to remain connected to the websocket and also simultaneously receive updates from Redis Streams.

This is my code for Websocket Consumer. As you can see, I am running an infinite loop to check for any new messages in the stream through Redis Consumer.

import json
from threading import Thread

from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
from django.utils import timezone
from redis import ConnectionPool, Redis
from redis_streams.consumer import Consumer


class ProgressStateConsumer(WebsocketConsumer):
    def __init__(self):
        self.task_id = self.task_group_name = None
        connection_pool = ConnectionPool(
            host="localhost",
            port=6379,
            db=0,
            decode_responses=True,
        )
        self.redis_connection = Redis(connection_pool=connection_pool)
        self.messages = []
        super().__init__()

    def connect(self):
        task_id = self.scope["url_route"]["kwargs"]["task_id"]
        self.task_id = task_id
        self.stream = f"task:{self.task_id}"
        self.task_group_name = (
            f"task-{self.task_id}-{round(timezone.now().timestamp())}"
        )
        async_to_sync(self.channel_layer.group_add)(
            self.task_group_name, self.channel_name
        )
        thread = Thread(target=self.send_status)
        thread.start()
        self.accept()

    def disconnect(self, close_code):
        async_to_sync(self.channel_layer.group_discard)(
            self.task_group_name, self.channel_name
        )

    def receive(self, text_data):
        pass

    def send_status(self):
        consumer = Consumer(
            redis_conn=self.redis_connection,
            stream=self.stream,
            consumer_group=self.task_group_name,
            batch_size=100,
            max_wait_time_ms=500,
        )
        while True:
            items = consumer.get_items()
            for message in items:
                self.messages.append(message.content["message"])
                consumer.remove_item_from_stream(item_id=message.msgid)
            if len(items):
                async_to_sync(self.channel_layer.group_send)(
                    self.task_group_name,
                    {
                        "type": "task_message",
                        "message": self.messages,
                    },
                )

    def task_message(self, event):
        self.send(text_data=json.dumps({"message": event["message"]}))

This seems to be solving my problem but I really needed to know if there is any better solution for achieving this and if running an infinite loop from a thread in the connect method of consumer is the only way I could do it?

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.