2

I establish a ServerSentEvent connection using the code below. The GET request must have a "Last-Event-ID" HTTP Header set that indicates the last fully processed event. Occasionally the Server can close the connection, upon which I need to retry it. On each retry however I need to update the value of the "Last-Event-ID" header.

The code below works, including the retries. But it does not update the value of "Last-Event-ID" header before retrying. It always uses the value that was assigned initially.

Can anybody point me in the right direction on how to implement this correctly?

var type = new ParameterizedTypeReference<ServerSentEvent<String>>() {};

var req = webClient.get()
    .uri("/the/end/point");

String lastEventId = loadLastId();
if (lastEventId != null && !lastEventId.isEmpty()) {
    req = req.header("Last-Event-ID", lastEventId);
}

req
    .retrieve()
    .bodyToFlux(type)
    .retryWhen(Retry
        .backoff(
            maxRetries,
            Duration.ofSeconds(retryBackoffStep)
        )
        .maxBackoff(Duration.ofSeconds(10))
        .transientErrors(true)
        .filter(t -> {
            log.warn("SSE Notification Channel: connection error detected", t);
            return !(t instanceof ClientAuthorizationException)
                && !(t instanceof WebClientResponseException.Forbidden)
                && !(t instanceof WebClientResponseException.NotFound);
        })
        .doBeforeRetry(signal -> {
            // How can I do the following?
            // req.header("Last-Event-ID", loadLastId());
            log.warn("SSE Notification Channel: retry SSE connection, attempt {}",
                            signal.totalRetriesInARow() + 1)
        })
        .doAfterRetry(signal ->
            log.warn("SSE Notification Channel: retried SSE connection, attempt {}, successful = {}",
                            signal.totalRetriesInARow() + 1, signal.failure() == null))
        .onRetryExhaustedThrow((retryBackoffSpec, signal) -> {
            log.error("SSE Notification Channel: SSE connection retries exhausted");
            return signal.failure();
        })
    )
    .subscribe(
        this::handleContent,
        this::handleError,
        () -> log.error("SSE Notification Channel: SSE connection closed")
    );

1 Answer 1

2

Once a request is sent, you cannot modify it.

In order to send a modified request (e.g. set a new header, change request body), you need to create a new request.

Flux.defer() defers (delays) a request's creation until it is sent.
When request has to be retried (resent), a supplier passed to Flux.defer() creates a new request with new header.

Flux<String> requestFlux = Flux.defer(() -> 
    var req = webClient.get()
        .uri("/the/end/point");
    
    String lastEventId = loadLastId();
    if (lastEventId != null && !lastEventId.isEmpty()) {
        req = req.header("Last-Event-ID", lastEventId);
    }

    req.retrieve().bodyToFlux(type);
);

requestFlux
    .retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(retryBackoffStep))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.