Update Celery task real-time with Redis PUB / SUB and Websocket

Tram Ho

I write this article mainly to re-share my knowledge during research to develop 3D web applications in which some time-consuming, complex-computing tasks such as render, extract data from image … will be handled. Server-side storage and update progress for the client to monitor.

This is the system layout that I use for the task of handling tasks from a user.

System requirements:

Function:

    • User uploads supported asset types (3D file: .fbx, .obj …; 2D file: substance).
    • The system performs the tasks depending on the type of asset being uploaded.
    • The tasks run in the background and run in parallei-mode.
    • Update realtime about task status for user

In the framework of this article, I do not mention in detail modules such as Redis, RabbitMQ, Celery, FastAPI, you can find a lot of tutorial tutorials on the internet. Here I will share how my team built to connect the above modules into the system to operate the problem set out.

Ok, let’s go to work.

Modules

Celery

First, a little introduction to Celery. For those of you who do Python, Celery is a well-known tool for Distributed Queue type of tasks. Basically, it will allocate tasks to workers, have an intermediary broker to receive tasks from the sending server. Commonly used intermediary brokers are RabbitMQ or Redis. Here, I will use RabbitMQ as broker, Redis I will use the PUB / SUB function to publish the status of the task to the client.

At the core of Celery is the tasks that will be executed by the workers. We convert a regular function into a celery task using the @task decorator

Here, for each task we will record the task-ID and combine with the session-ID we use JWT (JSON web token), the two IDs will be grouped into a channel to use Redis PUB. Since the user can submit multiple files and each file will correspond to a task executed in the background, the channel needs task-ID information so that the server can later PSUBSCRIBE (pattern subscribe). Sounds troublesome, a bit more for it to twist, and session-ID helps WebSocket send information back to the correct user who sent the tasks.

Hopefully the above layout helps you easily imagine why you need session-ID and task-ID.

The next part is simply configuring celery application. Start by defining the configuration:

And the celery application will run with the above Configuration object

Finally, just start the celery worker, ready to accept the task when required:

FastAPI

In the next section, I will briefly discuss FastAPI’s HTTP Request and WebSocket. I will for example some HTTP requests when the user requests to perform tasks.

WebSocket connection:

WebSocket is only connected when the user is authenticated, so in the URL of the WebSocket I include the session-ID which is initialized when the user logs in and is responded by the server with JSON web token.

To avoid web server blocking synchronous processes when using redis SUB, I use the aioredis module to create a Pattern Subscribe to Redis server.

You can find yourself choosing a pattern to subscribe tasksession-ID which ensures to subscribe all messges from tasks that the user has submitted in a session. WebSocket will send all messages every time it receives a message from the celery task. The client-side web application will update information from the WebSocket and display the process of the corresponding task. This part I would like to say more in the next section.

Epilogue

Thank you for reading this part. Hope you provided some useful information. Actually, these parts I picked up here and there on the internet and customized according to the requirements of our system. All feedback and communication with my team are very thankful.

Share the news now

Source : Viblo