0

I'm experiencing a confusing timing issue with RxJava's Maybe operators in a high-throughput service (40 TPS) with concurrent execution. I have two classes that log execution times, but they're showing very different timings for what should be the same operation.

The Problem

Class A (the actual implementation):

public Maybe<Decision> execute(Input input) {
    long start = System.nanoTime();
    
    return Maybe.fromCallable(() -> {
        // Some processing that takes ~500ms
        return callExternalService();
    })
    .doOnSuccess(decision -> {
        log.info("Implementation took {} ms", 
                 ((System.nanoTime() - start) / 1000000)); // Logs ~500ms
    });
}

Class B (a decorator that wraps Class A):

public Maybe<Decision> execute(Input input) {
    return Maybe.defer(() -> {
        long startTime = System.nanoTime();
        
        return delegate.execute(input)  // Calls Class A
                .doOnSuccess(decision -> {
                    // Other success logic
                })
                .doFinally(() -> {
                    log.info("Strategy took {} ms",
                             ((System.nanoTime() - startTime) / 1000000)); // Logs ~1300ms
                });
    });
}

The Timing Issue

Class A's doOnSuccess: Logs at ~500ms ✓ (expected)

Class B's doFinally: Logs at ~1300ms ❌ (800ms later than expected)

The doFinally in the decorator is executing much later than the doOnSuccess in the implementation, even though they should be measuring the same operation. The log from doFinally gets printed somewhere in-between during the downstream processing (not at the end).

The Context All the 'A' like classes are strategies. The strategies are executed by an executor. All high priority strategies are executed first in parallel, followed by medium and then the low priority ones. Class A was a low priority strategy executed at the end.

@Singleton
public class StrategyExecutor {
    private final Executor executor; // ThreadContextAwareExecutor with cached thread pool
    
    public Maybe<Decision> strategyExecute(Request request, /* other params */) {
        return Maybe.defer(() -> {
            return Observable.fromArray(HIGH, MEDIUM, LOW)               
                    .concatMapMaybe(priority -> {
                        List<Strategy> strategies = strategyMap.get(priority);
                        
                        // Convert each strategy to Maybe and run concurrently
                        List<Maybe<Decision>> maybesFromStrategies = strategies.stream()
                                .map(strategy -> strategy.execute(input)
                                        .subscribeOn(Schedulers.from(executor)) 
                                        .onErrorComplete() // Graceful error handling
                                        .filter(Decision::isValid))
                                .collect(Collectors.toList());

                        return Maybe.merge(maybesFromStrategies)
                                .firstElement();
                    })
                    .firstElement();
        });
    }
}

  1. High Throughput Environment:

  2. Running at 40 TPS (transactions per second)

  3. Multiple concurrent requests being processed

  4. Each request runs on cached thread pools, with the executor shown below.

Executor Used - Executors.newCachedThreadPool()

The strategy executor's result gets consumed by a top-level class which calls the downstream services. Downstream services are highly latent services

public Observable<Response> processRequest(Request request) {
    return strategyExecutor.strategyExecute(request, /* params */)  // Returns Maybe<Decision>
            .toObservable()
            .flatMap(decision -> {
                Service selectedService = serviceMap.get(decision.serviceName());
                return selectedService.process(request); 
            });
}

Threading Concerns

Since we're running at high throughput with concurrent requests:

  1. Could thread scheduling delays cause doFinally to execute much later?

  2. Is doFinally waiting for some part of the downstream chain to complete before executing? (this does not happen for high/medium priority strategies)

  3. In a multithreaded environment, does doFinally behavior differ from single-threaded execution?

Questions

  1. Is Maybe.merge() with firstElement() causing doFinally to wait for the entire downstream chain?

  2. In this concurrent execution model, does doFinally wait until the consumer completely processes the result, not just when the Maybe emits?

  3. Could the Executor and thread scheduling at 40 TPS be affecting when doFinally executes?

  4. Is there a difference in how doFinally behaves when the Maybe is part of a Maybe.merge() operation vs. standalone execution?

What's the correct way to measure just the strategy execution time (not downstream processing) in this concurrent merge scenario?

What I've Tried Moving the timing logic to doOnSuccess and doOnError instead of doFinally, but I want to understand if the high throughput and threading model is contributing to this behavior. If I use timeInterval() in the strategy executor and use a doOnSuccess to log, it works as expected.

RxJava Version: 3.x

JDK 17

1 Answer 1

1

Functionally there is a clear difference between doOnSuccess and doFinally:

Maybe.doOnSuccess uses MaybePeek internally:

        @Override
        public void onSuccess(T value) {
            if (this.upstream == DisposableHelper.DISPOSED) {
                return;
            }
            try {
                parent.onSuccessCall.accept(value); <-- callback happens here
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                onErrorInner(ex);
                return;
            }
            this.upstream = DisposableHelper.DISPOSED;

            downstream.onSuccess(value);

            onAfterTerminate();
        }

Maybe.doFinally uses MaybeDoFinally internally:

        @Override
        public void onSuccess(T t) {
            downstream.onSuccess(t);
            runFinally(); <-- callback happens here
        }

So the most likely explanation is that the downstream operator takes 800ms to execute.

Sign up to request clarification or add additional context in comments.

2 Comments

What is the downstream here? is it - selectedService.process(request); The selected process here is highly latent and takes way longer than 800ms to complete. From your experience, do you suspect anything else going on with the scheduler or the internal queue maintained by the maybe.merge?
onSuccess flows down from the source observable, so downstream are whatever operators you applied after the doFinally operator. Until it reaches the observer in the .subscribe call.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.