In my use case, I need to use the RSocket protocol to create a reactive stream over the network, and on the client side return a pekko Source, and on the server side return a pekko Sink. I have no problems on the client side because creating the connection returns a Flux, which can be trivially converted to a pekko Source with Source.fromPublisher.
On the server side, RSocket expects a Flux as input, but I have other code that is expecting a pekko Sink as a result (it can't take a ManyWithUpstream unless I can convert that to a Sink).
Here's a minimal code example that works flawlessly for my use case but relies on the deprecated EmitterProcessor.
def serverSink : Sink[Payload, NotUsed] = {
val processor : Processor[Payload, Payload] = EmitterProcessor.create(1)
RSocketServer.create(
SocketAcceptor.forRequestStream(payload =>
Flux.from(processor)
)).bindNow(TcpServerTransport.create("localhost", 3141))
Sink.fromSubscriber(processor)
}
I have tried the following but when I try to run it nothing fed to the Sink will be actually fed into RSocket for reasons I don't completely understand.
def serverSink : Sink[Payload, NotUsed] = {
val processor : Processor[Payload, Payload] = Flow[Payload].toProcessor.run()
RSocketServer.create(
SocketAcceptor.forRequestStream(payload =>
Flux.from(processor)
)).bindNow(TcpServerTransport.create("localhost", 3141))
Sink.fromSubscriber(processor)
}