0

I'd like a function which asynchronously processes a variable amount of (Sink, Stream) tuples.

use futures::channel::mpsc;
use futures::{Sink, Stream, SinkExt, StreamExt};

async fn foo(v: Vec<(Box<dyn Sink<Error = std::io::Error>>, Box<dyn Stream<Item = u8>>)>) {
    for (mut tx, mut rx) in v {
        let _ = tx.send(0);
        let _ = rx.next().await;
    }
}

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (tx, mut rx) = mpsc::channel(32);
    foo(vec![(Box::new(tx), Box::new(rx))]).await;
    
    Ok(())
}

But I get this compilation error:

error[E0107]: wrong number of type arguments: expected 1, found 0
 --> src/main.rs:4:30
  |
4 | async fn foo(v: Vec<(Box<dyn Sink<Error = std::io::Error>>, Box<dyn Stream<Item = u8>>)>) {
  |                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected 1 type argument

I was prompted to declare the associated type for the trait object that way by the compiler itself. I'm unsure why it does not accept it.

1 Answer 1

2

The compiler wants you to specify the "type argument" of the Sink. This is not the error type, but the type of the item being sent down the sink, as in Sink<Foo>. You specify u8 as the type of the stream, and are sending the value unchanged between one and the other, so you probably want a Sink<u8>.

Once you do that, the compiler will next complain that you need to specify the Error associated type (this time for real). However if you specify std::io::Error, the call to foo() from main() won't compile because the implementation of Sink for mpsc::Sender specifies its own mpsc::SendError as the error type.

Finally, both the sink and the stream need to be pinned so they can live across await points. This is done by using Pin<Box<...>> instead of Box<...> and Box::pin(...) instead of Box::new(...).

With the above changes, a version that compiles looks like this:

use futures::channel::mpsc;
use futures::{Sink, SinkExt, Stream, StreamExt};
use std::pin::Pin;

async fn foo(
    v: Vec<(
        Pin<Box<dyn Sink<u8, Error = mpsc::SendError>>>,
        Pin<Box<dyn Stream<Item = u8>>>,
    )>,
) {
    for (mut tx, mut rx) in v {
        let _ = tx.send(0);
        let _ = rx.next().await;
    }
}

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (tx, rx) = mpsc::channel(32);
    foo(vec![(Box::pin(tx), Box::pin(rx))]).await;

    Ok(())
}
Sign up to request clarification or add additional context in comments.

3 Comments

This appears to require foo() to accept only vectors of (Sink<u8>, Stream<u8>) tuples instead of the more generic case my program requires. I'd like to pass a vector of all sorts of (tx, rx) mpsc pairs and other I/O streams (with the common attribute that they all implement Sink or Stream), which is why I'm using trait objects. My parameter v acts like components in the Trait Objects chapter of the Rust Book (doc.rust-lang.org/book/ch17-02-trait-objects.html). I suspect Rust can handle this case within a function scope, but breaks when consuming it as a parameter?
@armani Your foo() uses tx.send(0), so it does seem to require numbers. Rust can handle sending heterogeneous objects in streams, but you need to be more precise with requirements. As it stands, the question is not really answerable.
Enough pain; I'll choose the Bytes type from the bytes crate and figure out its parsing later. This lets me resolve my original error.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.