0

In my use case, I need to use the RSocket protocol to create a reactive stream over the network, and on the client side return a pekko Source, and on the server side return a pekko Sink. I have no problems on the client side because creating the connection returns a Flux, which can be trivially converted to a pekko Source with Source.fromPublisher.

On the server side, RSocket expects a Flux as input, but I have other code that is expecting a pekko Sink as a result (it can't take a ManyWithUpstream unless I can convert that to a Sink).

Here's a minimal code example that works flawlessly for my use case but relies on the deprecated EmitterProcessor.

def serverSink : Sink[Payload, NotUsed] = {
    val processor : Processor[Payload, Payload] = EmitterProcessor.create(1)
    RSocketServer.create(
      SocketAcceptor.forRequestStream(payload =>
        Flux.from(processor)
    )).bindNow(TcpServerTransport.create("localhost", 3141))
    Sink.fromSubscriber(processor)
  }

I have tried the following but when I try to run it nothing fed to the Sink will be actually fed into RSocket for reasons I don't completely understand.

def serverSink : Sink[Payload, NotUsed] = {
    val processor : Processor[Payload, Payload] = Flow[Payload].toProcessor.run()
    RSocketServer.create(
      SocketAcceptor.forRequestStream(payload =>
        Flux.from(processor)
    )).bindNow(TcpServerTransport.create("localhost", 3141))
    Sink.fromSubscriber(processor)
  }

1 Answer 1

0

So, I have a solution, but it's a little hacky. For reasons I don't understand, the publisher from pekko needs a subscriber other than the ones over the ones from RSocket. This works, but with the caveat that the sink will consume everything it can if there are no connected clients (which is actually preferred in my use case)

def serverSink : Sink[Payload, NotUsed] = {
    val sink : Sink[Payload, Publisher[Payload]] = Sink.asPublisher(true)
    sink.mapMaterializedValue { pub =>
      pub.subscribe(new Subscriber[Payload] {
        override def onComplete(): Unit = ()
        override def onError(t: Throwable): Unit = ()
        override def onNext(t: Payload): Unit = ()
        override def onSubscribe(s: Subscription): Unit = s.request(Long.MaxValue)
      })
      RSocketServer.create(
        SocketAcceptor.forRequestStream(payload =>
          Flux.from(pub)
      )).bindNow(TcpServerTransport.create("localhost", 3141))
      NotUsed.notUsed()
    }
  }
Sign up to request clarification or add additional context in comments.

1 Comment

This may not be the most general or best answer so I am going to hold on a second before accepting it because I want to see if anyone has a better answer.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.