3

I'm using com.squareup.wire.GrpcStreamingCall along with OkHttp in a Kotlin mobile app to stream gRPC events from a backend service. The issue I'm facing is that the client does not receive the streamed packets in real-time.

Instead, all packets are buffered internally and delivered together only when the server closes the connection or times out the session (e.g., after 5 minutes). This defeats the purpose of a streaming connection.

There is no any problem from backend side, API's are working fine in Postman.

Expected Behavior

Packets should be delivered immediately to the client as the server sends them (real-time). The GrpcStreamingCall.response flow should emit each message as it arrives on the wire.

Actual Behavior

Packets are only emitted from the flow after several minutes — when the session is closed/timed out. All buffered packets arrive together at once, as if they were delayed or cached.

Grpc client builder

val okHttpClient = OkHttpClient.Builder()
        .connectTimeout(30, TimeUnit.SECONDS)
        .readTimeout(0, TimeUnit.SECONDS) 
        .writeTimeout(30, TimeUnit.SECONDS)
        .addInterceptor(customHeadersInterceptor)
        .addInterceptor(httpToFileInterceptor)
        .addInterceptor(loggingInterceptor)
        .build()

GrpcClient.Builder()
            .client(okHttpClient)
            .baseUrl(baseUrl)
            .minMessageToCompress(Long.MAX_VALUE)
            .build()

Stream Events Implementation

override suspend fun startStreamingEvents(streamEventsRequest: StreamEventsRequest): Flow<StreamPacket> = callbackFlow {
    
    val streaming = call.authenticatedStream {  grpcStreamingClient.StreamEvents() }

    // Send the request
    streaming.request.send(streamEventsRequest)

    //Close the request stream!
    streaming.request.close() 

    val job = launch {
        try {
            for (packet in streaming.response) {
                send(packet)
            }
        } catch (e: Exception) {
            Timber.e(e, "Error while receiving packets")
            close(e)
        }
    }

    awaitClose {
        Timber.d("Stream closed")
        job.cancel()
    }
}

Supporting code

suspend fun <Req : Any, Res : Any> authenticatedStream(call: () -> GrpcStreamingCall<Req, Res>): StreamingCallResult<Req, Res> {
    val (request, response) = call().apply {
        requestMetadata = headerProvider.getAuthHeaders()
    }.executeIn(CoroutineScope(dispatchers.io))
    return StreamingCallResult(request, response)
}

data class StreamingCallResult<Req : Any, Res : Any>(
val request: SendChannel<Req>,
val response: ReceiveChannel<Res>

)

1
  • I have raised the same question in sqaure-wire's GitHub page too, but there wasn't much help. github.com/square/wire/issues/3370 Commented Nov 4 at 14:01

2 Answers 2

0

If I understand correctly, your gRPC stream works, but the client only gets all messages at the end, when the server closes the connection. So nothing is emitted live.

This usually happens because the client closes the request stream too early:

streaming.request.close()

When using Wire/OkHttp, closing the request channel flushes and finalizes the HTTP request body. When the server sees that the client is "done sending", it may buffer responses until the request is fully closed or until its own timeout triggers. That’s why Postman works: it keeps the request open, but your app doesn’t. What happens :

  • You send the first message

  • You immediately close the request stream

  • Server waits for the “client half-close” to complete

• Only then it flushes all responses → you get everything at once

So it's actually the streaming that's fine, but the handshake is not. To fix this: do not close the request stream immediately.

Leave it open, so the server can push packets as they come:

// remove: streaming.request.close()

Only close it when you actually want the stream to end:


awaitClose {
streaming.request.close()  // close here, not before 
job.cancel()

Once you stop closing the request upfront, messages will arrive in real time.

Sign up to request clarification or add additional context in comments.

Comments

0

Do not know about Wire + OkHttp bundle. I will give an example not using those, to show, that closed channel will not receive any message. In example there are 5 messages emitted with delay of 1sec, but only 4 of them are delivered as channel lifetime is 4sec.

So close channel only after your job is done. Also, keeping channel alive should be done implicitly, because if you decide to not call channel.shutdown() assuming it will keep open, it will not, it will be closed immediately after creation.

Server:

import io.grpc.Server
import io.grpc.ServerBuilder

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow

import streaming.StreamServiceGrpcKt
import streaming.Streaming

class GrpcServer
{
    val server = ServerBuilder.forPort(50051)
        .addService(HelloService())
        .build()

    val hook: Thread = Thread {
        println("Shutting down server")
        stop()
    }

    fun start() {
        server.start()
        Runtime.getRuntime().addShutdownHook(hook)
        println("Server started, listening on 50051")
    }

    fun stop() {
        server.shutdown()
        println("Server shut down")
    }

    fun blockUntilShutdown() {
        server.awaitTermination()
    }
}

class HelloService : StreamServiceGrpcKt.StreamServiceCoroutineImplBase()
{
    override fun streamEvents(request: Streaming.StreamRequest): Flow<Streaming.StreamResponse> = flow {

        for (i in 1..5)
        {
            val response = Streaming.StreamResponse.newBuilder()
                .setMessage("Hello #$i ${request.requestId}")
                .build()

            // NOTE: Do not forget to emit() response within flow
            emit(response)

            // NOTE: Delay for imitation of lasting process
            delay(1000)
        }
    }
}

fun main()
{
    val server = GrpcServer()
    server.start()

    server.blockUntilShutdown()
}

Client:

import io.grpc.ManagedChannelBuilder
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.delay

import streaming.StreamServiceGrpc
import streaming.Streaming

fun main(): Unit = runBlocking {

    val channel = ManagedChannelBuilder
        .forAddress("localhost", 50051)
        .usePlaintext()
        .build()

    val stub = StreamServiceGrpc.newStub(channel)

    val request = Streaming.StreamRequest.newBuilder()
        .setRequestId("ID-test-request!")
        .build()

    val responseObserver = object : io.grpc.stub.StreamObserver<Streaming.StreamResponse>
    {
        override fun onNext(value: Streaming.StreamResponse) {
            println("Received: ${value.message}")
        }

        override fun onError(t: Throwable) {
            println("Error: ${t.message}")
        }

        override fun onCompleted() {
            println("Stream completed")
        }
    }

    stub.streamEvents(request, responseObserver)

    // NOTE: Keep the application running to receive messages
    delay(4000)

    channel.shutdown()
}

Server Output:

Server started, listening on 50051

Client Output:

Received: Hello #1 ID-test-request!
Received: Hello #2 ID-test-request!
Received: Hello #3 ID-test-request!
Received: Hello #4 ID-test-request!

Process finished with exit code 0

Server still works. Channel is down, stream not completed.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.