/*
 * Decompiled with CFR 0.152.
 */
package com.launchdarkly.sdk.server;

import com.launchdarkly.logging.LDLogger;
import com.launchdarkly.logging.LogValues;
import com.launchdarkly.sdk.server.DiagnosticAccumulator;
import com.launchdarkly.sdk.server.StandardEndpoints;
import com.launchdarkly.sdk.server.StreamProcessorEvents;
import com.launchdarkly.sdk.server.Util;
import com.launchdarkly.sdk.server.interfaces.DataSource;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataSourceUpdates;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataStoreTypes;
import com.launchdarkly.sdk.server.interfaces.HttpConfiguration;
import com.launchdarkly.sdk.server.interfaces.SerializationException;
import com.launchdarkly.shaded.com.google.common.annotations.VisibleForTesting;
import com.launchdarkly.shaded.com.google.gson.JsonParseException;
import com.launchdarkly.shaded.com.google.gson.stream.JsonReader;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.ConnectStrategy;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.ErrorStrategy;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.FaultEvent;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.HttpConnectStrategy;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.MessageEvent;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.StreamClosedByCallerException;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.StreamClosedByServerException;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.StreamEvent;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.StreamException;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.StreamHttpErrorException;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.StreamIOException;
import com.launchdarkly.shaded.okhttp3.Headers;
import java.io.IOException;
import java.io.Reader;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

final class StreamProcessor
implements DataSource {
    private static final String PUT = "put";
    private static final String PATCH = "patch";
    private static final String DELETE = "delete";
    private static final Duration DEAD_CONNECTION_INTERVAL = Duration.ofSeconds(300L);
    private static final String ERROR_CONTEXT_MESSAGE = "in stream connection";
    private static final String WILL_RETRY_MESSAGE = "will retry";
    private final DataSourceUpdates dataSourceUpdates;
    private final HttpConfiguration httpConfig;
    private final Headers headers;
    @VisibleForTesting
    final URI streamUri;
    @VisibleForTesting
    final Duration initialReconnectDelay;
    private final DiagnosticAccumulator diagnosticAccumulator;
    private final int threadPriority;
    private final DataStoreStatusProvider.StatusListener statusListener;
    private volatile EventSource es;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private volatile long esStarted = 0L;
    private volatile boolean lastStoreUpdateFailed = false;
    private final LDLogger logger;

    StreamProcessor(HttpConfiguration httpConfig, DataSourceUpdates dataSourceUpdates, int threadPriority, DiagnosticAccumulator diagnosticAccumulator, URI streamUri, Duration initialReconnectDelay, LDLogger logger) {
        this.dataSourceUpdates = dataSourceUpdates;
        this.httpConfig = httpConfig;
        this.diagnosticAccumulator = diagnosticAccumulator;
        this.threadPriority = threadPriority;
        this.streamUri = streamUri;
        this.initialReconnectDelay = initialReconnectDelay;
        this.logger = logger;
        this.headers = Util.getHeadersBuilderFor(httpConfig).add("Accept", "text/event-stream").build();
        if (dataSourceUpdates.getDataStoreStatusProvider() != null && dataSourceUpdates.getDataStoreStatusProvider().isStatusMonitoringEnabled()) {
            this.statusListener = this::onStoreStatusChanged;
            dataSourceUpdates.getDataStoreStatusProvider().addStatusListener(this.statusListener);
        } else {
            this.statusListener = null;
        }
    }

    private void onStoreStatusChanged(DataStoreStatusProvider.Status newStatus) {
        EventSource stream;
        if (newStatus.isAvailable() && newStatus.isRefreshNeeded() && (stream = this.es) != null) {
            this.logger.warn("Restarting stream to refresh data after data store outage");
            stream.interrupt();
        }
    }

    @Override
    public Future<Void> start() {
        CompletableFuture<Void> initFuture = new CompletableFuture<Void>();
        URI endpointUri = Util.concatenateUriPath(this.streamUri, StandardEndpoints.STREAMING_REQUEST_PATH);
        HttpConnectStrategy eventSourceHttpConfig = ConnectStrategy.http(endpointUri).headers(this.headers).clientBuilderActions(clientBuilder -> Util.configureHttpClientBuilder(this.httpConfig, clientBuilder)).readTimeout(DEAD_CONNECTION_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
        EventSource.Builder builder = new EventSource.Builder(eventSourceHttpConfig).errorStrategy(ErrorStrategy.alwaysContinue()).logger(this.logger).readBufferSize(5000).streamEventData(true).expectFields("event").retryDelay(this.initialReconnectDelay.toMillis(), TimeUnit.MILLISECONDS);
        this.es = builder.build();
        Thread thread2 = new Thread(() -> {
            this.esStarted = System.currentTimeMillis();
            for (StreamEvent event : this.es.anyEvents()) {
                if (!this.handleEvent(event, initFuture)) break;
            }
        });
        thread2.setName("LaunchDarkly-streaming");
        thread2.setDaemon(true);
        thread2.setPriority(this.threadPriority);
        thread2.start();
        return initFuture;
    }

    private void recordStreamInit(boolean failed) {
        if (this.diagnosticAccumulator != null && this.esStarted != 0L) {
            this.diagnosticAccumulator.recordStreamInit(this.esStarted, System.currentTimeMillis() - this.esStarted, failed);
        }
    }

    @Override
    public void close() throws IOException {
        this.logger.info("Closing LaunchDarkly StreamProcessor");
        if (this.statusListener != null) {
            this.dataSourceUpdates.getDataStoreStatusProvider().removeStatusListener(this.statusListener);
        }
        if (this.es != null) {
            this.es.close();
        }
        this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.OFF, null);
    }

    @Override
    public boolean isInitialized() {
        return this.initialized.get();
    }

    private boolean handleEvent(StreamEvent event, CompletableFuture<Void> initFuture) {
        this.logger.debug("Received StreamEvent: {}", (Object)event);
        if (event instanceof MessageEvent) {
            this.handleMessage((MessageEvent)event, initFuture);
        } else if (event instanceof FaultEvent) {
            return this.handleError(((FaultEvent)event).getCause(), initFuture);
        }
        return true;
    }

    private void handleMessage(MessageEvent event, CompletableFuture<Void> initFuture) {
        try {
            switch (event.getEventName()) {
                case "put": {
                    this.handlePut(event.getDataReader(), initFuture);
                    break;
                }
                case "patch": {
                    this.handlePatch(event.getDataReader());
                    break;
                }
                case "delete": {
                    this.handleDelete(event.getDataReader());
                    break;
                }
                default: {
                    this.logger.warn("Unexpected event found in stream: {}", (Object)event.getEventName());
                }
            }
            this.lastStoreUpdateFailed = false;
            this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
        }
        catch (StreamInputException e) {
            this.logger.error("LaunchDarkly service request failed or received invalid data: {}", LogValues.exceptionSummary(e));
            this.logger.debug(LogValues.exceptionTrace(e));
            DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(e.getCause() instanceof IOException ? DataSourceStatusProvider.ErrorKind.NETWORK_ERROR : DataSourceStatusProvider.ErrorKind.INVALID_DATA, 0, e.getCause() == null ? e.getMessage() : e.getCause().toString(), Instant.now());
            this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.INTERRUPTED, errorInfo);
            this.es.interrupt();
        }
        catch (StreamStoreException e) {
            if (this.statusListener == null) {
                if (!this.lastStoreUpdateFailed) {
                    this.logger.warn("Restarting stream to ensure that we have the latest data");
                }
                this.es.interrupt();
            }
            this.lastStoreUpdateFailed = true;
        }
        catch (Exception e) {
            this.logger.warn("Unexpected error from stream processor: {}", LogValues.exceptionSummary(e));
            this.logger.debug(LogValues.exceptionTrace(e));
        }
    }

    private void handlePut(Reader eventData, CompletableFuture<Void> initFuture) throws StreamInputException, StreamStoreException {
        this.recordStreamInit(false);
        this.esStarted = 0L;
        StreamProcessorEvents.PutData putData = StreamProcessor.parseStreamJson(StreamProcessorEvents::parsePutData, eventData);
        if (!this.dataSourceUpdates.init(putData.data)) {
            throw new StreamStoreException();
        }
        if (!this.initialized.getAndSet(true)) {
            initFuture.complete(null);
            this.logger.info("Initialized LaunchDarkly client.");
        }
    }

    private void handlePatch(Reader eventData) throws StreamInputException, StreamStoreException {
        StreamProcessorEvents.PatchData data = StreamProcessor.parseStreamJson(StreamProcessorEvents::parsePatchData, eventData);
        if (data.kind == null) {
            return;
        }
        if (!this.dataSourceUpdates.upsert(data.kind, data.key, data.item)) {
            throw new StreamStoreException();
        }
    }

    private void handleDelete(Reader eventData) throws StreamInputException, StreamStoreException {
        StreamProcessorEvents.DeleteData data = StreamProcessor.parseStreamJson(StreamProcessorEvents::parseDeleteData, eventData);
        if (data.kind == null) {
            return;
        }
        DataStoreTypes.ItemDescriptor placeholder = new DataStoreTypes.ItemDescriptor(data.version, null);
        if (!this.dataSourceUpdates.upsert(data.kind, data.key, placeholder)) {
            throw new StreamStoreException();
        }
    }

    private boolean handleError(StreamException e, CompletableFuture<Void> initFuture) {
        boolean streamFailed = true;
        if (e instanceof StreamClosedByCallerException) {
            streamFailed = false;
        } else {
            this.logger.warn("Encountered EventSource error: {}", LogValues.exceptionSummary(e));
        }
        this.recordStreamInit(streamFailed);
        if (e instanceof StreamHttpErrorException) {
            int status = ((StreamHttpErrorException)e).getCode();
            DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(status);
            boolean recoverable = Util.checkIfErrorIsRecoverableAndLog(this.logger, Util.httpErrorDescription(status), ERROR_CONTEXT_MESSAGE, status, WILL_RETRY_MESSAGE);
            if (recoverable) {
                this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.INTERRUPTED, errorInfo);
                this.esStarted = System.currentTimeMillis();
                return true;
            }
            this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.OFF, errorInfo);
            initFuture.complete(null);
            return false;
        }
        boolean isNetworkError = e instanceof StreamIOException || e instanceof StreamClosedByServerException;
        Util.checkIfErrorIsRecoverableAndLog(this.logger, e.toString(), ERROR_CONTEXT_MESSAGE, 0, WILL_RETRY_MESSAGE);
        DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromException(isNetworkError ? DataSourceStatusProvider.ErrorKind.NETWORK_ERROR : DataSourceStatusProvider.ErrorKind.UNKNOWN, e);
        this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.INTERRUPTED, errorInfo);
        return true;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static <T> T parseStreamJson(Function<JsonReader, T> parser, Reader r) throws StreamInputException {
        try (JsonReader jr = new JsonReader(r);){
            JsonReader t = parser.apply(jr);
            return (T)t;
        }
        catch (JsonParseException e) {
            throw new StreamInputException(e);
        }
        catch (SerializationException e) {
            throw new StreamInputException(e);
        }
        catch (IOException e) {
            throw new StreamInputException(e);
        }
    }

    private static final class StreamStoreException
    extends Exception {
        private StreamStoreException() {
        }
    }

    private static final class StreamInputException
    extends Exception {
        public StreamInputException(Throwable cause) {
            super(cause);
        }
    }
}

