5

I am using hyper 0.12 to build a proxy service. When receiving a response body from the upstream server I want to forward it back to the client ASAP, and save the contents in a buffer for later processing.

So I need a function that:

  • takes a Stream (a hyper::Body, to be precise)
  • returns a Stream that is functionally identical to the input stream
  • also returns some sort of Future<Item = Vec<u8>, Error = ...> that is resolved with the buffered contents of the input stream, when the output stream is completely consumed

I can't for the life of me figure out how to do this.

I guess the function I'm looking for will look something like this:

type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
    let body2 = ... // ???
    let buffer = body.fold(Vec::<u8>::new(), |mut buf, chunk| {
        buf.extend_from_slice(&chunk);
        // ...somehow send this chunk to body2 also?
    });
    (body2, buffer);
}

Below is what I have tried, and it works until send_data() fails (obviously).

type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
    let (mut sender, body2) = hyper::Body::channel();
    let consume =
        body.map_err(|_| ()).fold(Vec::<u8>::new(), move |mut buf, chunk| {
            buf.extend_from_slice(&chunk);

            // What to do if this fails?
            if sender.send_data(chunk).is_err() {}
            Box::new(future::ok(buf))
        });

    (body2, Box::new(consume));
}

However, something tells me I am on the wrong track.

I have found Sink.fanout() which seems like it is what I want, but I do not have a Sink, and I don't know how to construct one. hyper::Body implements Stream but not Sink.

5
  • What behavior should happen if the second future is polled before the first stream finishes? Commented Aug 7, 2018 at 16:22
  • I would try to Stream.map() the body. The buffer will only be filled when the stream is consumed, but if you want to send the stream quickly that's what you need to do anyway. Commented Aug 7, 2018 at 16:27
  • @Shepmaster Then the future should not yet be ready; it should be completed only once the entire stream was read or passed though. Commented Aug 7, 2018 at 17:00
  • @JanHudec How can I then return a future that completes once the stream is finished while at the same time returning said stream back to hyper? It seems al related methods consume the stream... Commented Aug 7, 2018 at 17:00
  • @molf, that's what I am not sure. I think you need one or the other side to do manage the pulling. Commented Aug 7, 2018 at 17:24

2 Answers 2

5

What I ended up doing was implement a new type of stream that does what I need. This appeared to be necessary because hyper::Body does not implement Sink nor does hyper::Chunk implement Clone (which is required for Sink.fanout()), so I cannot use any of the existing combinators.

First a struct that contains all details that we need and methods to append a new chunk, as well as notify that the buffer is completed.

struct BodyClone<T> {
    body: T,
    buffer: Option<Vec<u8>>,
    sender: Option<futures::sync::oneshot::Sender<Vec<u8>>>,
}

impl BodyClone<hyper::Body> {
    fn flush(&mut self) {
        if let (Some(buffer), Some(sender)) = (self.buffer.take(), self.sender.take()) {
            if sender.send(buffer).is_err() {}
        }
    }

    fn push(&mut self, chunk: &hyper::Chunk) {
        use hyper::body::Payload;

        let length = if let Some(buffer) = self.buffer.as_mut() {
            buffer.extend_from_slice(chunk);
            buffer.len() as u64
        } else {
            0
        };

        if let Some(content_length) = self.body.content_length() {
            if length >= content_length {
                self.flush();
            }
        }
    }
}

Then I implemented the Stream trait for this struct.

impl Stream for BodyClone<hyper::Body> {
    type Item = hyper::Chunk;
    type Error = hyper::Error;

    fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
        match self.body.poll() {
            Ok(Async::Ready(Some(chunk))) => {
                self.push(&chunk);
                Ok(Async::Ready(Some(chunk)))
            }
            Ok(Async::Ready(None)) => {
                self.flush();
                Ok(Async::Ready(None))
            }
            other => other,
        }
    }
}

Finally I could define an extension method on hyper::Body:

pub type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()> + Send>;

trait CloneBody {
    fn clone_body(self) -> (hyper::Body, BufferFuture);
}

impl CloneBody for hyper::Body {
    fn clone_body(self) -> (hyper::Body, BufferFuture) {
        let (sender, receiver) = futures::sync::oneshot::channel();

        let cloning_stream = BodyClone {
            body: self,
            buffer: Some(Vec::new()),
            sender: Some(sender),
        };

        (
            hyper::Body::wrap_stream(cloning_stream),
            Box::new(receiver.map_err(|_| ())),
        )
    }
}

This can be used as follows:

let (body: hyper::Body, buffer: BufferFuture) = body.clone_body();
Sign up to request clarification or add additional context in comments.

Comments

0

In hyper v1 things changed quite a lot, to copy data from a Hyper stream while simultaneously forwarding the stream I tried to implement a custom Body wrapper that clones the incoming data frames. I was really struggling with that, don't know if there's a better approach. Here my proposed solution:

use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
use futures::Stream;
use hyper::body::{Body, Incoming};
use tokio::sync::mpsc;

pub struct BodyReader<T> {
  inner: T,
  sender: mpsc::Sender<Bytes>,
}

impl<T: Body<Data = Bytes, Error = hyper::Error> + Unpin> Stream for BodyReader<T> {
  type Item = Result<Bytes, std::io::Error>;

  fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
    loop {
      return match futures::ready!(Pin::new(&mut self.inner).poll_frame(cx)) {
        Some(Ok(frame)) => match frame.into_data() {
          Ok(buf) => {
            let _ = self.sender.try_send(buf.clone());
            Poll::Ready(Some(Ok(buf)))
          }
          Err(_) => continue,
        },
        Some(Err(err)) => Poll::Ready(Some(Err(std::io::Error::other(err)))),
        None => Poll::Ready(None),
      };
    }
  }
}

impl BodyReader<Incoming> {
  pub fn new(body: Incoming) -> (Self, mpsc::Receiver<Bytes>) {
    let (sender, receiver) = mpsc::channel(10);
    let reader = BodyReader {
      inner: body,
      sender,
    };

    (reader, receiver)
  }
}

type ReaderAndReceiver = (BodyReader<Incoming>, mpsc::Receiver<Bytes>);

pub trait BodyReading {
  fn read(self) -> ReaderAndReceiver;
}

impl BodyReading for Incoming {
  fn read(self) -> ReaderAndReceiver {
    BodyReader::new(self)
  }
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.