/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.http.common.client.sse;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.mule.runtime.http.api.HttpConstants;
import org.mule.runtime.http.api.client.HttpClient;
import org.mule.runtime.http.api.client.HttpRequestOptions;
import org.mule.runtime.http.api.domain.entity.EmptyHttpEntity;
import org.mule.runtime.http.api.domain.entity.EntitySubscription;
import org.mule.runtime.http.api.domain.entity.HttpEntity;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.request.HttpRequestBuilder;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.runtime.http.api.sse.ServerSentEvent;
import org.mule.runtime.http.api.sse.client.SseFailureContext;
import org.mule.runtime.http.api.sse.client.SseListener;
import org.mule.runtime.http.api.sse.client.SseSource;
import org.mule.runtime.http.api.sse.client.SseSourceConfig;
import org.mule.service.http.common.client.sse.InternalConnectable;
import org.mule.service.http.common.client.sse.RetryHelper;
import org.mule.service.http.common.client.sse.ServerSentEventDecoder;
import org.mule.service.http.common.client.sse.SseFailureContextImpl;
import org.mule.service.http.common.client.sse.SseStreamConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultSseSource
implements SseSource,
SseListener,
InternalConnectable {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSseSource.class);
    private HttpResponse httpResponse;
    private final String uri;
    private final HttpClient httpClient;
    private final Consumer<HttpRequestBuilder> requestCustomizer;
    private final HttpRequestOptions requestOptions;
    private final RetryHelper retryHelper;
    private final ScheduledExecutorService ioScheduler;
    private final AtomicInteger readyState;
    private final Map<String, SseListener> eventListenersByTopic = new ConcurrentHashMap<String, SseListener>();
    private SseListener fallbackListener = event -> {};
    private String lastEventId = null;
    private final List<Consumer<SseFailureContext>> onConnectionFailureCallbacks = new ArrayList<Consumer<SseFailureContext>>();
    private CompletableFuture<HttpResponse> responseFuture;
    private final boolean preserveHeaderCase;
    private EntitySubscription entitySubscription;

    public DefaultSseSource(SseSourceConfig config, HttpClient httpClient, ScheduledExecutorService ioScheduler) {
        this.uri = config.getUrl();
        this.httpClient = httpClient;
        this.requestCustomizer = config.getRequestCustomizer();
        this.requestOptions = config.getRequestOptions();
        this.readyState = new AtomicInteger(2);
        this.retryHelper = new RetryHelper(ioScheduler, config.getRetryConfig(), this);
        this.preserveHeaderCase = config.isPreserveHeaderCase();
        this.ioScheduler = ioScheduler;
        this.httpResponse = config.getResponse();
    }

    public int getReadyState() {
        return this.readyState.get();
    }

    public synchronized void open() {
        if (2 != this.readyState.get()) {
            LOGGER.debug("Connection is not needed when trying to open SSE source, skipping.");
            return;
        }
        if (this.httpResponse != null) {
            this.openFromResponse(this.httpResponse);
        } else {
            this.internalConnect();
        }
    }

    private void openFromResponse(HttpResponse response) {
        if (response == null) {
            throw new IllegalArgumentException("Response cannot be null when trying to open an SseSource");
        }
        if (!this.isSuccessfullyConnected(response)) {
            throw new IllegalArgumentException("Response is not a successfully established SSE connection. Status code: '%d', Content-Type: '%s'".formatted(response.getStatusCode(), response.getHeaderValue("Content-Type")));
        }
        this.readyState.set(1);
        this.httpResponse = response;
        HttpEntity entity = response.getEntity();
        ServerSentEventDecoder decoder = new ServerSentEventDecoder(this);
        if (!entity.isReactive()) {
            this.ioScheduler.submit(new SseStreamConsumer(response.getEntity(), decoder, this.httpClient.getName()));
            return;
        }
        this.entitySubscription = entity.onData(data -> DefaultSseSource.passDataToSseDecoder(data, decoder));
        entity.onComplete((ignoredTrailers, err) -> decoder.close());
    }

    public void doOnConnectionFailure(Consumer<SseFailureContext> onConnectionFailure) {
        this.onConnectionFailureCallbacks.add(onConnectionFailure);
    }

    public void register(SseListener listener) {
        this.fallbackListener = listener;
    }

    public void register(String eventName, SseListener listener) {
        this.eventListenersByTopic.put(eventName, listener);
    }

    public synchronized void close() {
        if (null != this.responseFuture) {
            this.responseFuture.cancel(true);
        }
        if (null != this.entitySubscription) {
            this.entitySubscription.cancel();
        }
        this.httpResponse = null;
        this.retryHelper.abortReties();
        this.onClose();
    }

    public synchronized void onEvent(ServerSentEvent event) {
        if (2 == this.readyState.get()) {
            throw new IllegalStateException("SSE source is already closed");
        }
        event.getId().ifPresent(id -> {
            this.lastEventId = id;
        });
        event.getRetryDelay().ifPresent(this.retryHelper::setDelayIfAllowed);
        this.eventListenersByTopic.getOrDefault(event.getName(), this.fallbackListener).onEvent(event);
    }

    public synchronized void onClose() {
        if (this.retryHelper.shouldRetryOnStreamEnd()) {
            this.readyState.set(0);
            this.retryHelper.scheduleReconnection();
        } else {
            this.eventListenersByTopic.values().forEach(SseListener::onClose);
            this.fallbackListener.onClose();
            this.readyState.set(2);
        }
    }

    @Override
    public synchronized void internalConnect() {
        this.readyState.set(0);
        HttpRequest request = this.createInitiatorRequest();
        this.responseFuture = this.httpClient.sendAsync(request, this.requestOptions);
        this.responseFuture.whenComplete(this::handleResponseOrError);
    }

    private synchronized void handleResponseOrError(HttpResponse httpResponse, Throwable error) {
        if (this.isSuccessfullyConnected(httpResponse)) {
            this.openFromResponse(httpResponse);
            return;
        }
        SseFailureContextImpl ctx = new SseFailureContextImpl(httpResponse, error, this.retryHelper);
        for (Consumer<SseFailureContext> callback : this.onConnectionFailureCallbacks) {
            callback.accept(ctx);
        }
        if (!this.retryHelper.isRetryEnabled()) {
            this.onClose();
            return;
        }
        this.readyState.set(0);
        this.retryHelper.scheduleReconnection();
    }

    private boolean isSuccessfullyConnected(HttpResponse httpResponse) {
        if (null == httpResponse) {
            return false;
        }
        if (HttpConstants.HttpStatus.OK.getStatusCode() != httpResponse.getStatusCode()) {
            return false;
        }
        return "text/event-stream".equalsIgnoreCase(httpResponse.getHeaderValue("Content-Type"));
    }

    private HttpRequest createInitiatorRequest() {
        HttpRequestBuilder builder = (HttpRequestBuilder)((HttpRequestBuilder)((HttpRequestBuilder)HttpRequest.builder((boolean)this.preserveHeaderCase).method(HttpConstants.Method.GET).uri(this.uri).addHeader("Accept", "text/event-stream")).addHeader("Cache-Control", "no-cache")).entity((HttpEntity)new EmptyHttpEntity());
        if (null != this.lastEventId) {
            builder.addHeader("Last-Event-ID", this.lastEventId);
        }
        this.requestCustomizer.accept(builder);
        return builder.build();
    }

    private static void passDataToSseDecoder(ByteBuffer data, ServerSentEventDecoder decoder) {
        int len = data.remaining();
        byte[] bytes = new byte[len];
        data.get(bytes);
        decoder.decode(bytes, len);
    }
}

