I'm using com.squareup.wire.GrpcStreamingCall along with OkHttp in a Kotlin mobile app to stream gRPC events from a backend service. The issue I'm facing is that the client does not receive the streamed packets in real-time.
Instead, all packets are buffered internally and delivered together only when the server closes the connection or times out the session (e.g., after 5 minutes). This defeats the purpose of a streaming connection.
There is no any problem from backend side, API's are working fine in Postman.
Expected Behavior
Packets should be delivered immediately to the client as the server sends them (real-time).
The GrpcStreamingCall.response flow should emit each message as it arrives on the wire.
Actual Behavior
Packets are only emitted from the flow after several minutes — when the session is closed/timed out. All buffered packets arrive together at once, as if they were delayed or cached.
Grpc client builder
val okHttpClient = OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(0, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.addInterceptor(customHeadersInterceptor)
.addInterceptor(httpToFileInterceptor)
.addInterceptor(loggingInterceptor)
.build()
GrpcClient.Builder()
.client(okHttpClient)
.baseUrl(baseUrl)
.minMessageToCompress(Long.MAX_VALUE)
.build()
Stream Events Implementation
override suspend fun startStreamingEvents(streamEventsRequest: StreamEventsRequest): Flow<StreamPacket> = callbackFlow {
val streaming = call.authenticatedStream { grpcStreamingClient.StreamEvents() }
// Send the request
streaming.request.send(streamEventsRequest)
//Close the request stream!
streaming.request.close()
val job = launch {
try {
for (packet in streaming.response) {
send(packet)
}
} catch (e: Exception) {
Timber.e(e, "Error while receiving packets")
close(e)
}
}
awaitClose {
Timber.d("Stream closed")
job.cancel()
}
}
Supporting code
suspend fun <Req : Any, Res : Any> authenticatedStream(call: () -> GrpcStreamingCall<Req, Res>): StreamingCallResult<Req, Res> {
val (request, response) = call().apply {
requestMetadata = headerProvider.getAuthHeaders()
}.executeIn(CoroutineScope(dispatchers.io))
return StreamingCallResult(request, response)
}
data class StreamingCallResult<Req : Any, Res : Any>(
val request: SendChannel<Req>,
val response: ReceiveChannel<Res>
)