3

I have a program that is largely synchronous. Only for certain operations, it needs to perform async calls from a private library. So I'm building a new tokio runtime on which I spawn some worker task, and then pass the data from that task to my sync world using a channel, which calls recv() in a blocking manner.

Greatly simplified, the problematic part of the code looks like this:

use std::sync::mpsc::{channel, Sender};

struct Worker {
    chan: Sender<bool>
}

impl Worker {
    async fn do_work(&self) {
        loop {
            // do some more async work
            self.chan.send(true).unwrap();
        }
    }
}

fn main() {
    let (tx, rx) = channel::<bool>();
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    
    let _ = rt.spawn(async move {
        let worker = Worker { chan: tx };
        worker.do_work().await;
    });
    
    println!("received {}", rx.recv().unwrap());
}

On playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=156e86c82839ef9d2a50fbcb8a67b412

But this code does not compile, and the relevant message is:

within `Worker`, the trait `Sync` is not implemented for `std::sync::mpsc::Sender<bool>`
 note: future is not `Send` as this value is used across an await

The channel is only passed to one thread, and cloning the sender like in this question does not solve it. Similarly, using an async channel is not an option since the receiver is sync. Instead it might have to do with how it is contained within the worker object, as the error message suggests. But in my case, the worker object needs to do other things, and call the method that calls send() multiple times, among some other async operations. It needs to be wrapped in an object since it is passed to another library function (it is always moved, not passed by reference though).

What can I do to make this work?

3
  • 1
    Note that your code deadlocks. With current thread scheduler, you need to call block_on() to start the runtime. Commented Apr 3, 2023 at 16:38
  • @ChayimFriedman In that case, how can I run the task in a detached manner, so I can wait on the recv() while the worker does its work and every now and then spits out something to the channel? Commented Apr 4, 2023 at 8:18
  • Spawn a different thread and use block_on() in it. Commented Apr 4, 2023 at 8:20

1 Answer 1

3

The hard way

Never hold a reference to the channel or the worker between await points. For example, take self instead of &self in your code.

The easy way

Use an async channel. tokio's channels have a blocking_recv() method you can use in synchronous contexts.

Or use a different channel implementation, such as crossbeam's, whose Sender is Sync.

Sign up to request clarification or add additional context in comments.

5 Comments

This makes me wonder why std::sync:mpsc::channel exists at all. Is there anything it does better than crossbeam? And didn't they copy it in to fix that horrific bug a while back, or am I remembering incorrectly?
@KevinAnderson Yes, they copied their implementation, but left the API as-is although there were talkings about relaxing it in the future. Some indeed think that including a channel implementation in std was a mistake, but now it's too late to fix :)
I also came up with taking self instead of &self, but it still gave me the same error. Using a tokio channel did solve it for me. I think it is strange that you cannot pass the std Sender to threads.
@BorisMulder In the code you provided using self works. I don't know your actual code, it may not be enough there.
In my case it wasn't, because the Sender was contained in a sub-struct that was passed by reference across multiple awaits. Using another channel implementation did solve it, though.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.