I am facing an issue where my Actix-web server starts correctly, but the moment I hit the /upload_chunk endpoint from my frontend, the request hangs forever.
After some debugging, I found that the problem comes from a background task using tokio::spawn with an infinite loop and Arc<Mutex<...>>.
If I comment out this spawned task, everything works perfectly.
/upload_chunk endpoint`:
type RedisQueue = Arc<Mutex<RedisTaskQueue>>;
#[post("/upload_chunk")]
async fn upload_chunk(
data: web::Json<ChunkData>,
redis_queue: web::Data<RedisQueue>,
) -> impl Responder {
let result = match redis_queue
.lock()
.await
.push_task(&data.encrypted_chunk)
.await
{
Ok(_) => HttpResponse::Ok().json("Task queued successfully"),
Err(_) => HttpResponse::InternalServerError().json("Failed to queue task"),
};
result
}
RedisTaskQueue implementation:
pub struct RedisTaskQueue {
connection: redis::aio::Connection,
queue_name: String,
uploader: IPFSUploader
}
impl RedisTaskQueue {
pub fn new(connection: redis::aio::Connection, queue_name: &str, uploader: IPFSUploader) -> Self {
//some code
}
pub async fn push_task(&mut self, data: &str) -> RedisResult<()> {
self.connection.lpush(&self.queue_name, data).await
}
pub async fn pop_task(&mut self) -> RedisResult<()> {
let result:Option<(String, String)> = self.connection.brpop(&self.queue_name, 0.0).await?;
if let Some((_, chunk)) = result {
match self.uploader.upload_chunk(chunk).await{
Ok(_) => print!("Chunks uploaded Successfully!"),
Err(_) => print!("There was an error")
}
}
Ok(())
}
}
main Function:
async fn main() -> std::io::Result<()> {
let redis_queue = Arc::new(Mutex::new(RedisTaskQueue::new(
connection,
"task_queue",
uploader,
)));
//spawned task starts
tokio::spawn({
let redis_queue = redis_queue.clone();
async move {
loop {
match redis_queue.lock().await.pop_task().await {
Ok(_) => println!("Popped task"),
Err(e) => eprintln!("Error popping task: {}", e),
}
}
}
});
//spawned task ends
//...some code
HttpServer::new(move || {
App::new()
.wrap(
actix_cors::Cors::default()
.allowed_origin("http://localhost:5173")
.allowed_methods(vec!["GET", "POST"])
.allow_any_header(),
)
.app_data(web::Data::new(redis_queue.clone()))
.service(upload_chunk)
})
.listen(listener)?
.run()
.await
}
Question
Why does the server hang when both the background loop and the endpoint try to lock the same Arc<Mutex<T>>?
I expected Tokio's mutex to allow concurrent async tasks, but instead the request never completes.
Any help is appreciated!
I am very new to all this stuff, so I might be missing something obvious.
pop_task()itself is waiting, so what happens is that the spawned task locks the mutex and hangs inpop_task()which is empty at the time (with the mutex locked). On the other hand, the handler hangs onlock()to be able to push the task, so they wait for each other. I can't find whereRedisTaskQueueyou are using come from, so I can't readily recommend a workaround - but I suspect it should take&self, or you need to clone the redis handle so you don't need this kind of lock before accessing a blocking call.