I'm experiencing a confusing timing issue with RxJava's Maybe operators in a high-throughput service (40 TPS) with concurrent execution. I have two classes that log execution times, but they're showing very different timings for what should be the same operation.
The Problem
Class A (the actual implementation):
public Maybe<Decision> execute(Input input) {
long start = System.nanoTime();
return Maybe.fromCallable(() -> {
// Some processing that takes ~500ms
return callExternalService();
})
.doOnSuccess(decision -> {
log.info("Implementation took {} ms",
((System.nanoTime() - start) / 1000000)); // Logs ~500ms
});
}
Class B (a decorator that wraps Class A):
public Maybe<Decision> execute(Input input) {
return Maybe.defer(() -> {
long startTime = System.nanoTime();
return delegate.execute(input) // Calls Class A
.doOnSuccess(decision -> {
// Other success logic
})
.doFinally(() -> {
log.info("Strategy took {} ms",
((System.nanoTime() - startTime) / 1000000)); // Logs ~1300ms
});
});
}
The Timing Issue
Class A's doOnSuccess: Logs at ~500ms ✓ (expected)
Class B's doFinally: Logs at ~1300ms ❌ (800ms later than expected)
The doFinally in the decorator is executing much later than the doOnSuccess in the implementation, even though they should be measuring the same operation. The log from doFinally gets printed somewhere in-between during the downstream processing (not at the end).
The Context All the 'A' like classes are strategies. The strategies are executed by an executor. All high priority strategies are executed first in parallel, followed by medium and then the low priority ones. Class A was a low priority strategy executed at the end.
@Singleton
public class StrategyExecutor {
private final Executor executor; // ThreadContextAwareExecutor with cached thread pool
public Maybe<Decision> strategyExecute(Request request, /* other params */) {
return Maybe.defer(() -> {
return Observable.fromArray(HIGH, MEDIUM, LOW)
.concatMapMaybe(priority -> {
List<Strategy> strategies = strategyMap.get(priority);
// Convert each strategy to Maybe and run concurrently
List<Maybe<Decision>> maybesFromStrategies = strategies.stream()
.map(strategy -> strategy.execute(input)
.subscribeOn(Schedulers.from(executor))
.onErrorComplete() // Graceful error handling
.filter(Decision::isValid))
.collect(Collectors.toList());
return Maybe.merge(maybesFromStrategies)
.firstElement();
})
.firstElement();
});
}
}
High Throughput Environment:
Running at 40 TPS (transactions per second)
Multiple concurrent requests being processed
Each request runs on cached thread pools, with the executor shown below.
Executor Used - Executors.newCachedThreadPool()
The strategy executor's result gets consumed by a top-level class which calls the downstream services. Downstream services are highly latent services
public Observable<Response> processRequest(Request request) {
return strategyExecutor.strategyExecute(request, /* params */) // Returns Maybe<Decision>
.toObservable()
.flatMap(decision -> {
Service selectedService = serviceMap.get(decision.serviceName());
return selectedService.process(request);
});
}
Threading Concerns
Since we're running at high throughput with concurrent requests:
Could thread scheduling delays cause doFinally to execute much later?
Is
doFinallywaiting for some part of the downstream chain to complete before executing? (this does not happen for high/medium priority strategies)In a multithreaded environment, does doFinally behavior differ from single-threaded execution?
Questions
Is Maybe.merge() with firstElement() causing doFinally to wait for the entire downstream chain?
In this concurrent execution model, does doFinally wait until the consumer completely processes the result, not just when the Maybe emits?
Could the Executor and thread scheduling at 40 TPS be affecting when doFinally executes?
Is there a difference in how doFinally behaves when the Maybe is part of a Maybe.merge() operation vs. standalone execution?
What's the correct way to measure just the strategy execution time (not downstream processing) in this concurrent merge scenario?
What I've Tried
Moving the timing logic to doOnSuccess and doOnError instead of doFinally, but I want to understand if the high throughput and threading model is contributing to this behavior. If I use timeInterval() in the strategy executor and use a doOnSuccess to log, it works as expected.
RxJava Version: 3.x
JDK 17