/*
 * Decompiled with CFR 0.152.
 */
package com.launchdarkly.sdk.server;

import com.launchdarkly.sdk.server.DataModel;
import com.launchdarkly.sdk.server.DiagnosticAccumulator;
import com.launchdarkly.sdk.server.FeatureRequestor;
import com.launchdarkly.sdk.server.JsonHelpers;
import com.launchdarkly.sdk.server.Loggers;
import com.launchdarkly.sdk.server.Util;
import com.launchdarkly.sdk.server.interfaces.DataSource;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataSourceUpdates;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataStoreTypes;
import com.launchdarkly.sdk.server.interfaces.HttpConfiguration;
import com.launchdarkly.sdk.server.interfaces.SerializationException;
import com.launchdarkly.shaded.com.google.common.annotations.VisibleForTesting;
import com.launchdarkly.shaded.com.google.gson.JsonElement;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.ConnectionErrorHandler;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.MessageEvent;
import com.launchdarkly.shaded.com.launchdarkly.eventsource.UnsuccessfulResponseException;
import com.launchdarkly.shaded.okhttp3.Headers;
import com.launchdarkly.shaded.okhttp3.OkHttpClient;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;

final class StreamProcessor
implements DataSource {
    private static final String STREAM_URI_PATH = "all";
    private static final String PUT = "put";
    private static final String PATCH = "patch";
    private static final String DELETE = "delete";
    private static final Logger logger = Loggers.DATA_SOURCE;
    private static final Duration DEAD_CONNECTION_INTERVAL = Duration.ofSeconds(300L);
    private static final String ERROR_CONTEXT_MESSAGE = "in stream connection";
    private static final String WILL_RETRY_MESSAGE = "will retry";
    private final DataSourceUpdates dataSourceUpdates;
    private final HttpConfiguration httpConfig;
    private final Headers headers;
    @VisibleForTesting
    final URI streamUri;
    @VisibleForTesting
    final Duration initialReconnectDelay;
    private final DiagnosticAccumulator diagnosticAccumulator;
    private final EventSourceCreator eventSourceCreator;
    private final int threadPriority;
    private final DataStoreStatusProvider.StatusListener statusListener;
    private volatile EventSource es;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private volatile long esStarted = 0L;
    private volatile boolean lastStoreUpdateFailed = false;
    ConnectionErrorHandler connectionErrorHandler = this.createDefaultConnectionErrorHandler();

    StreamProcessor(HttpConfiguration httpConfig, DataSourceUpdates dataSourceUpdates, EventSourceCreator eventSourceCreator, int threadPriority, DiagnosticAccumulator diagnosticAccumulator, URI streamUri, Duration initialReconnectDelay) {
        this.dataSourceUpdates = dataSourceUpdates;
        this.httpConfig = httpConfig;
        this.diagnosticAccumulator = diagnosticAccumulator;
        this.eventSourceCreator = eventSourceCreator != null ? eventSourceCreator : this::defaultEventSourceCreator;
        this.threadPriority = threadPriority;
        this.streamUri = streamUri;
        this.initialReconnectDelay = initialReconnectDelay;
        this.headers = Util.getHeadersBuilderFor(httpConfig).add("Accept", "text/event-stream").build();
        if (dataSourceUpdates.getDataStoreStatusProvider() != null && dataSourceUpdates.getDataStoreStatusProvider().isStatusMonitoringEnabled()) {
            this.statusListener = this::onStoreStatusChanged;
            dataSourceUpdates.getDataStoreStatusProvider().addStatusListener(this.statusListener);
        } else {
            this.statusListener = null;
        }
    }

    private void onStoreStatusChanged(DataStoreStatusProvider.Status newStatus) {
        EventSource stream;
        if (newStatus.isAvailable() && newStatus.isRefreshNeeded() && (stream = this.es) != null) {
            logger.warn("Restarting stream to refresh data after data store outage");
            stream.restart();
        }
    }

    private ConnectionErrorHandler createDefaultConnectionErrorHandler() {
        return t -> {
            this.recordStreamInit(true);
            if (t instanceof UnsuccessfulResponseException) {
                int status = ((UnsuccessfulResponseException)t).getCode();
                DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(status);
                boolean recoverable = Util.checkIfErrorIsRecoverableAndLog(logger, Util.httpErrorDescription(status), ERROR_CONTEXT_MESSAGE, status, WILL_RETRY_MESSAGE);
                if (recoverable) {
                    this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.INTERRUPTED, errorInfo);
                    this.esStarted = System.currentTimeMillis();
                    return ConnectionErrorHandler.Action.PROCEED;
                }
                this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.OFF, errorInfo);
                return ConnectionErrorHandler.Action.SHUTDOWN;
            }
            Util.checkIfErrorIsRecoverableAndLog(logger, t.toString(), ERROR_CONTEXT_MESSAGE, 0, WILL_RETRY_MESSAGE);
            DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromException(t instanceof IOException ? DataSourceStatusProvider.ErrorKind.NETWORK_ERROR : DataSourceStatusProvider.ErrorKind.UNKNOWN, t);
            this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.INTERRUPTED, errorInfo);
            return ConnectionErrorHandler.Action.PROCEED;
        };
    }

    @Override
    public Future<Void> start() {
        CompletableFuture<Void> initFuture = new CompletableFuture<Void>();
        ConnectionErrorHandler wrappedConnectionErrorHandler = t -> {
            ConnectionErrorHandler.Action result = this.connectionErrorHandler.onConnectionError(t);
            if (result == ConnectionErrorHandler.Action.SHUTDOWN) {
                initFuture.complete(null);
            }
            return result;
        };
        StreamEventHandler handler = new StreamEventHandler(initFuture);
        this.es = this.eventSourceCreator.createEventSource(new EventSourceParams(handler, Util.concatenateUriPath(this.streamUri, STREAM_URI_PATH), this.initialReconnectDelay, wrappedConnectionErrorHandler, this.headers, this.httpConfig));
        this.esStarted = System.currentTimeMillis();
        this.es.start();
        return initFuture;
    }

    private void recordStreamInit(boolean failed) {
        if (this.diagnosticAccumulator != null && this.esStarted != 0L) {
            this.diagnosticAccumulator.recordStreamInit(this.esStarted, System.currentTimeMillis() - this.esStarted, failed);
        }
    }

    @Override
    public void close() throws IOException {
        logger.info("Closing LaunchDarkly StreamProcessor");
        if (this.statusListener != null) {
            this.dataSourceUpdates.getDataStoreStatusProvider().removeStatusListener(this.statusListener);
        }
        if (this.es != null) {
            this.es.close();
        }
        this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.OFF, null);
    }

    @Override
    public boolean isInitialized() {
        return this.initialized.get();
    }

    private EventSource defaultEventSourceCreator(final EventSourceParams params) {
        EventSource.Builder builder = new EventSource.Builder(params.handler, params.streamUri).threadPriority(this.threadPriority).loggerBaseName(Loggers.DATA_SOURCE_LOGGER_NAME).clientBuilderActions(new EventSource.Builder.ClientConfigurer(){

            @Override
            public void configure(OkHttpClient.Builder builder) {
                Util.configureHttpClientBuilder(params.httpConfig, builder);
            }
        }).connectionErrorHandler(params.errorHandler).headers(params.headers).reconnectTime(params.initialReconnectDelay).readTimeout(DEAD_CONNECTION_INTERVAL);
        return builder.build();
    }

    private static Map.Entry<DataStoreTypes.DataKind, String> getKindAndKeyFromStreamApiPath(String path) throws StreamInputException {
        if (path == null) {
            throw new StreamInputException("missing item path");
        }
        for (DataStoreTypes.DataKind kind : DataModel.ALL_DATA_KINDS) {
            String prefix = kind == DataModel.SEGMENTS ? "/segments/" : "/flags/";
            if (!path.startsWith(prefix)) continue;
            return new AbstractMap.SimpleEntry<DataStoreTypes.DataKind, String>(kind, path.substring(prefix.length()));
        }
        return null;
    }

    private static <T> T parseStreamJson(Class<T> c, String json) throws StreamInputException {
        try {
            return JsonHelpers.deserialize(json, c);
        }
        catch (SerializationException e) {
            throw new StreamInputException(e);
        }
    }

    private static DataModel.VersionedData deserializeFromParsedJson(DataStoreTypes.DataKind kind, JsonElement parsedJson) throws StreamInputException {
        try {
            return JsonHelpers.deserializeFromParsedJson(kind, parsedJson);
        }
        catch (SerializationException e) {
            throw new StreamInputException(e);
        }
    }

    @FunctionalInterface
    static interface EventSourceCreator {
        public EventSource createEventSource(EventSourceParams var1);
    }

    private class StreamEventHandler
    implements EventHandler {
        private final CompletableFuture<Void> initFuture;

        StreamEventHandler(CompletableFuture<Void> initFuture) {
            this.initFuture = initFuture;
        }

        @Override
        public void onOpen() throws Exception {
        }

        @Override
        public void onClosed() throws Exception {
        }

        @Override
        public void onMessage(String name, MessageEvent event) throws Exception {
            try {
                switch (name) {
                    case "put": {
                        this.handlePut(event.getData());
                        break;
                    }
                    case "patch": {
                        this.handlePatch(event.getData());
                        break;
                    }
                    case "delete": {
                        this.handleDelete(event.getData());
                        break;
                    }
                    default: {
                        logger.warn("Unexpected event found in stream: " + name);
                    }
                }
                StreamProcessor.this.lastStoreUpdateFailed = false;
                StreamProcessor.this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
            }
            catch (StreamInputException e) {
                logger.error("LaunchDarkly service request failed or received invalid data: {}", (Object)e.toString());
                logger.debug(e.toString(), (Throwable)e);
                DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(e.getCause() instanceof IOException ? DataSourceStatusProvider.ErrorKind.NETWORK_ERROR : DataSourceStatusProvider.ErrorKind.INVALID_DATA, 0, e.getCause() == null ? e.getMessage() : e.getCause().toString(), Instant.now());
                StreamProcessor.this.dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.INTERRUPTED, errorInfo);
                StreamProcessor.this.es.restart();
            }
            catch (StreamStoreException e) {
                if (StreamProcessor.this.statusListener == null) {
                    if (!StreamProcessor.this.lastStoreUpdateFailed) {
                        logger.warn("Restarting stream to ensure that we have the latest data");
                    }
                    StreamProcessor.this.es.restart();
                }
                StreamProcessor.this.lastStoreUpdateFailed = true;
            }
            catch (Exception e) {
                logger.warn("Unexpected error from stream processor: {}", (Object)e.toString());
                logger.debug(e.toString(), (Throwable)e);
            }
        }

        private void handlePut(String eventData) throws StreamInputException, StreamStoreException {
            StreamProcessor.this.recordStreamInit(false);
            StreamProcessor.this.esStarted = 0L;
            PutData putData = (PutData)StreamProcessor.parseStreamJson(PutData.class, eventData);
            DataStoreTypes.FullDataSet<DataStoreTypes.ItemDescriptor> allData = putData.data.toFullDataSet();
            if (!StreamProcessor.this.dataSourceUpdates.init(allData)) {
                throw new StreamStoreException();
            }
            if (!StreamProcessor.this.initialized.getAndSet(true)) {
                this.initFuture.complete(null);
                logger.info("Initialized LaunchDarkly client.");
            }
        }

        private void handlePatch(String eventData) throws StreamInputException, StreamStoreException {
            PatchData data = (PatchData)StreamProcessor.parseStreamJson(PatchData.class, eventData);
            Map.Entry kindAndKey = StreamProcessor.getKindAndKeyFromStreamApiPath(data.path);
            if (kindAndKey == null) {
                return;
            }
            DataStoreTypes.DataKind kind = (DataStoreTypes.DataKind)kindAndKey.getKey();
            String key = (String)kindAndKey.getValue();
            DataModel.VersionedData item = StreamProcessor.deserializeFromParsedJson(kind, data.data);
            if (!StreamProcessor.this.dataSourceUpdates.upsert(kind, key, new DataStoreTypes.ItemDescriptor(item.getVersion(), item))) {
                throw new StreamStoreException();
            }
        }

        private void handleDelete(String eventData) throws StreamInputException, StreamStoreException {
            DeleteData data = (DeleteData)StreamProcessor.parseStreamJson(DeleteData.class, eventData);
            Map.Entry kindAndKey = StreamProcessor.getKindAndKeyFromStreamApiPath(data.path);
            if (kindAndKey == null) {
                return;
            }
            DataStoreTypes.DataKind kind = (DataStoreTypes.DataKind)kindAndKey.getKey();
            String key = (String)kindAndKey.getValue();
            DataStoreTypes.ItemDescriptor placeholder = new DataStoreTypes.ItemDescriptor(data.version, null);
            if (!StreamProcessor.this.dataSourceUpdates.upsert(kind, key, placeholder)) {
                throw new StreamStoreException();
            }
        }

        @Override
        public void onComment(String comment) {
            logger.debug("Received a heartbeat");
        }

        @Override
        public void onError(Throwable throwable) {
            logger.warn("Encountered EventSource error: {}", (Object)throwable.toString());
            logger.debug(throwable.toString(), throwable);
        }
    }

    static final class EventSourceParams {
        final EventHandler handler;
        final URI streamUri;
        final Duration initialReconnectDelay;
        final ConnectionErrorHandler errorHandler;
        final Headers headers;
        final HttpConfiguration httpConfig;

        EventSourceParams(EventHandler handler, URI streamUri, Duration initialReconnectDelay, ConnectionErrorHandler errorHandler, Headers headers, HttpConfiguration httpConfig) {
            this.handler = handler;
            this.streamUri = streamUri;
            this.initialReconnectDelay = initialReconnectDelay;
            this.errorHandler = errorHandler;
            this.headers = headers;
            this.httpConfig = httpConfig;
        }
    }

    private static final class StreamInputException
    extends Exception {
        public StreamInputException(String message) {
            super(message);
        }

        public StreamInputException(Throwable cause) {
            super(cause);
        }
    }

    private static final class DeleteData {
        String path;
        int version;
    }

    private static final class PatchData {
        String path;
        JsonElement data;
    }

    private static final class PutData {
        FeatureRequestor.AllData data;
    }

    private static final class StreamStoreException
    extends Exception {
        private StreamStoreException() {
        }
    }
}

