0

I am facing an issue where my Actix-web server starts correctly, but the moment I hit the /upload_chunk endpoint from my frontend, the request hangs forever. After some debugging, I found that the problem comes from a background task using tokio::spawn with an infinite loop and Arc<Mutex<...>>. If I comment out this spawned task, everything works perfectly.

/upload_chunk endpoint`:

type RedisQueue = Arc<Mutex<RedisTaskQueue>>;

#[post("/upload_chunk")]
async fn upload_chunk(
    data: web::Json<ChunkData>,
    redis_queue: web::Data<RedisQueue>,
) -> impl Responder {
    let result = match redis_queue
        .lock()
        .await
        .push_task(&data.encrypted_chunk)
        .await
    {
        Ok(_) => HttpResponse::Ok().json("Task queued successfully"),
        Err(_) => HttpResponse::InternalServerError().json("Failed to queue task"),
    };
    result
}

RedisTaskQueue implementation:

pub struct RedisTaskQueue {
    connection: redis::aio::Connection,
    queue_name: String,
    uploader: IPFSUploader
}

impl RedisTaskQueue {
    pub fn new(connection: redis::aio::Connection, queue_name: &str, uploader: IPFSUploader) -> Self {
        //some code
    }

    pub async fn push_task(&mut self, data: &str) -> RedisResult<()> {
        self.connection.lpush(&self.queue_name, data).await
    }

    pub async fn pop_task(&mut self) -> RedisResult<()> {
        let result:Option<(String, String)> = self.connection.brpop(&self.queue_name, 0.0).await?;

        if let Some((_, chunk)) = result {
            match self.uploader.upload_chunk(chunk).await{
                Ok(_) => print!("Chunks uploaded Successfully!"),
                Err(_) => print!("There was an error")
            }
        }
        Ok(())
    }
}

main Function:

async fn main() -> std::io::Result<()> {
let redis_queue = Arc::new(Mutex::new(RedisTaskQueue::new(
        connection,
        "task_queue",
        uploader,
    )));



    //spawned task starts

    tokio::spawn({
        let redis_queue = redis_queue.clone();
        async move {
            loop {
                match redis_queue.lock().await.pop_task().await {
                    Ok(_) => println!("Popped task"),
                    Err(e) => eprintln!("Error popping task: {}", e),
                }
            }
        }
    });

    //spawned task ends



    //...some code


    HttpServer::new(move || {
        App::new()
            .wrap(
                actix_cors::Cors::default()
                    .allowed_origin("http://localhost:5173")
                    .allowed_methods(vec!["GET", "POST"])
                    .allow_any_header(),
            )
            .app_data(web::Data::new(redis_queue.clone()))
            .service(upload_chunk)
    })
    .listen(listener)?
    .run()
    .await
}

Question

Why does the server hang when both the background loop and the endpoint try to lock the same Arc<Mutex<T>>? I expected Tokio's mutex to allow concurrent async tasks, but instead the request never completes.

Any help is appreciated!

I am very new to all this stuff, so I might be missing something obvious.

1
  • I think the issue is that pop_task() itself is waiting, so what happens is that the spawned task locks the mutex and hangs in pop_task() which is empty at the time (with the mutex locked). On the other hand, the handler hangs on lock() to be able to push the task, so they wait for each other. I can't find where RedisTaskQueue you are using come from, so I can't readily recommend a workaround - but I suspect it should take &self, or you need to clone the redis handle so you don't need this kind of lock before accessing a blocking call. Commented 22 hours ago

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.