/*
 * Decompiled with CFR 0.152.
 */
package com.launchdarkly.sdk.server;

import com.launchdarkly.eventsource.ConnectionErrorHandler;
import com.launchdarkly.eventsource.MessageEvent;
import com.launchdarkly.sdk.LDValue;
import com.launchdarkly.sdk.internal.events.DiagnosticStore;
import com.launchdarkly.sdk.server.BaseTest;
import com.launchdarkly.sdk.server.Components;
import com.launchdarkly.sdk.server.ComponentsImpl;
import com.launchdarkly.sdk.server.DataModel;
import com.launchdarkly.sdk.server.DataStoreTestTypes;
import com.launchdarkly.sdk.server.InMemoryDataStore;
import com.launchdarkly.sdk.server.LDConfig;
import com.launchdarkly.sdk.server.ModelBuilders;
import com.launchdarkly.sdk.server.StandardEndpoints;
import com.launchdarkly.sdk.server.StreamProcessor;
import com.launchdarkly.sdk.server.TestComponents;
import com.launchdarkly.sdk.server.TestUtil;
import com.launchdarkly.sdk.server.integrations.StreamingDataSourceBuilder;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
import com.launchdarkly.sdk.server.subsystems.ClientContext;
import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer;
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink;
import com.launchdarkly.sdk.server.subsystems.DataStore;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;
import com.launchdarkly.sdk.server.subsystems.HttpConfiguration;
import com.launchdarkly.testhelpers.ConcurrentHelpers;
import com.launchdarkly.testhelpers.httptest.Handler;
import com.launchdarkly.testhelpers.httptest.Handlers;
import com.launchdarkly.testhelpers.httptest.HttpServer;
import com.launchdarkly.testhelpers.httptest.RequestInfo;
import com.launchdarkly.testhelpers.httptest.SpecialHttpConfigurations;
import com.launchdarkly.testhelpers.tcptest.TcpHandler;
import com.launchdarkly.testhelpers.tcptest.TcpHandlers;
import com.launchdarkly.testhelpers.tcptest.TcpServer;
import java.io.EOFException;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamProcessorTest
extends BaseTest {
    private static final String SDK_KEY = "sdk_key";
    private static final Duration BRIEF_RECONNECT_DELAY = Duration.ofMillis(10L);
    private static final String FEATURE1_KEY = "feature1";
    private static final int FEATURE1_VERSION = 11;
    private static final DataModel.FeatureFlag FEATURE = ModelBuilders.flagBuilder("feature1").version(11).build();
    private static final String SEGMENT1_KEY = "segment1";
    private static final int SEGMENT1_VERSION = 22;
    private static final DataModel.Segment SEGMENT = ModelBuilders.segmentBuilder("segment1").version(22).build();
    private static final String EMPTY_DATA_EVENT = StreamProcessorTest.makePutEvent(new DataStoreTestTypes.DataBuilder().addAny(DataModel.FEATURES, new DataModel.VersionedData[0]).addAny(DataModel.SEGMENTS, new DataModel.VersionedData[0]));
    private InMemoryDataStore dataStore;
    private TestComponents.MockDataSourceUpdates dataSourceUpdates;
    private TestComponents.MockDataStoreStatusProvider dataStoreStatusProvider;

    private static Handler streamResponse(String data) {
        return Handlers.all((Handler[])new Handler[]{Handlers.SSE.start(), Handlers.SSE.event((String)data), Handlers.SSE.leaveOpen()});
    }

    private static Handler closableStreamResponse(String data, Semaphore closeSignal) {
        return Handlers.all((Handler[])new Handler[]{Handlers.SSE.start(), Handlers.SSE.event((String)data), Handlers.waitFor((Semaphore)closeSignal)});
    }

    private static Handler streamResponseFromQueue(BlockingQueue<String> events) {
        return Handlers.all((Handler[])new Handler[]{Handlers.SSE.start(), ctx -> {
            try {
                while (true) {
                    String event = (String)events.take();
                    Handlers.SSE.event((String)event).apply(ctx);
                }
            }
            catch (InterruptedException e) {
                return;
            }
        }});
    }

    private static String makeEvent(String type, String data) {
        return "event: " + type + "\ndata: " + data;
    }

    private static String makePutEvent(DataStoreTestTypes.DataBuilder data) {
        return StreamProcessorTest.makeEvent("put", "{\"data\":" + data.buildJson().toJsonString() + "}");
    }

    private static String makePatchEvent(String path, DataStoreTypes.DataKind kind, DataModel.VersionedData item) {
        String json = kind.serialize(new DataStoreTypes.ItemDescriptor(item.getVersion(), (Object)item));
        return StreamProcessorTest.makeEvent("patch", "{\"path\":\"" + path + "\",\"data\":" + json + "}");
    }

    private static String makeDeleteEvent(String path, int version) {
        return StreamProcessorTest.makeEvent("delete", "{\"path\":\"" + path + "\",\"version\":" + version + "}");
    }

    @Before
    public void setup() {
        this.dataStore = new InMemoryDataStore();
        this.dataStoreStatusProvider = new TestComponents.MockDataStoreStatusProvider();
        this.dataSourceUpdates = TestComponents.dataSourceUpdates((DataStore)this.dataStore, this.dataStoreStatusProvider);
    }

    @Test
    public void builderHasDefaultConfiguration() throws Exception {
        StreamingDataSourceBuilder f = Components.streamingDataSource();
        try (StreamProcessor sp = (StreamProcessor)f.build((ClientContext)TestComponents.clientContext(SDK_KEY, LDConfig.DEFAULT).withDataSourceUpdateSink((DataSourceUpdateSink)this.dataSourceUpdates));){
            MatcherAssert.assertThat((Object)sp.initialReconnectDelay, (Matcher)Matchers.equalTo((Object)StreamingDataSourceBuilder.DEFAULT_INITIAL_RECONNECT_DELAY));
            MatcherAssert.assertThat((Object)sp.streamUri, (Matcher)Matchers.equalTo((Object)StandardEndpoints.DEFAULT_STREAMING_BASE_URI));
        }
    }

    @Test
    public void builderCanSpecifyConfiguration() throws Exception {
        StreamingDataSourceBuilder f = Components.streamingDataSource().initialReconnectDelay(Duration.ofMillis(5555L));
        try (StreamProcessor sp = (StreamProcessor)f.build((ClientContext)TestComponents.clientContext(SDK_KEY, LDConfig.DEFAULT).withDataSourceUpdateSink((DataSourceUpdateSink)TestComponents.dataSourceUpdates((DataStore)this.dataStore)));){
            MatcherAssert.assertThat((Object)sp.initialReconnectDelay, (Matcher)Matchers.equalTo((Object)Duration.ofMillis(5555L)));
        }
    }

    @Test
    public void verifyStreamRequestProperties() throws Exception {
        HttpConfiguration httpConfig = TestComponents.clientContext(SDK_KEY, LDConfig.DEFAULT).getHttp();
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponse(EMPTY_DATA_EVENT));
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
            sp.start();
            RequestInfo req = server.getRecorder().requireRequest();
            MatcherAssert.assertThat((Object)req.getMethod(), (Matcher)Matchers.equalTo((Object)"GET"));
            MatcherAssert.assertThat((Object)req.getPath(), (Matcher)Matchers.equalTo((Object)"/all"));
            for (Map.Entry kv : httpConfig.getDefaultHeaders()) {
                MatcherAssert.assertThat((Object)req.getHeader((String)kv.getKey()), (Matcher)Matchers.equalTo((Object)((String)kv.getValue())));
            }
            MatcherAssert.assertThat((Object)req.getHeader("Accept"), (Matcher)Matchers.equalTo((Object)"text/event-stream"));
        }
    }

    @Test
    public void streamBaseUriDoesNotNeedTrailingSlash() throws Exception {
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponse(EMPTY_DATA_EVENT));){
            URI baseUri = server.getUri();
            MatcherAssert.assertThat((Object)baseUri.toString(), (Matcher)Matchers.endsWith((String)"/"));
            URI trimmedUri = URI.create(server.getUri().toString().substring(0, server.getUri().toString().length() - 1));
            try (StreamProcessor sp = this.createStreamProcessor(null, trimmedUri);){
                sp.start();
                RequestInfo req = server.getRecorder().requireRequest();
                MatcherAssert.assertThat((Object)req.getPath(), (Matcher)Matchers.equalTo((Object)"/all"));
            }
        }
    }

    @Test
    public void streamBaseUriCanHaveContextPath() throws Exception {
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponse(EMPTY_DATA_EVENT));){
            URI baseUri = server.getUri().resolve("/context/path");
            try (StreamProcessor sp = this.createStreamProcessor(null, baseUri);){
                sp.start();
                RequestInfo req = server.getRecorder().requireRequest();
                MatcherAssert.assertThat((Object)req.getPath(), (Matcher)Matchers.equalTo((Object)"/context/path/all"));
            }
        }
    }

    @Test
    public void putCausesFeatureToBeStored() throws Exception {
        DataModel.FeatureFlag flag = ModelBuilders.flagBuilder(FEATURE1_KEY).version(11).build();
        DataStoreTestTypes.DataBuilder data = new DataStoreTestTypes.DataBuilder().addAny(DataModel.FEATURES, new DataModel.VersionedData[]{flag}).addAny(DataModel.SEGMENTS, new DataModel.VersionedData[0]);
        Handler streamHandler = StreamProcessorTest.streamResponse(StreamProcessorTest.makePutEvent(data));
        try (HttpServer server = HttpServer.start((Handler)streamHandler);
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
            sp.start();
            this.dataSourceUpdates.awaitInit();
            this.assertFeatureInStore(flag);
        }
    }

    @Test
    public void putCausesSegmentToBeStored() throws Exception {
        DataModel.Segment segment = ModelBuilders.segmentBuilder(SEGMENT1_KEY).version(22).build();
        DataStoreTestTypes.DataBuilder data = new DataStoreTestTypes.DataBuilder().addAny(DataModel.FEATURES, new DataModel.VersionedData[0]).addAny(DataModel.SEGMENTS, new DataModel.VersionedData[]{segment});
        Handler streamHandler = StreamProcessorTest.streamResponse(StreamProcessorTest.makePutEvent(data));
        try (HttpServer server = HttpServer.start((Handler)streamHandler);
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
            sp.start();
            this.dataSourceUpdates.awaitInit();
            this.assertSegmentInStore(SEGMENT);
        }
    }

    @Test
    public void storeNotInitializedByDefault() throws Exception {
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponse(""));
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
            sp.start();
            Assert.assertFalse((boolean)this.dataStore.isInitialized());
        }
    }

    @Test
    public void processorNotInitializedByDefault() throws Exception {
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponse(""));
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
            sp.start();
            Assert.assertFalse((boolean)sp.isInitialized());
        }
    }

    @Test
    public void futureIsNotSetByDefault() throws Exception {
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponse(""));
             StreamProcessor sp = this.createStreamProcessor(server.getUri());){
            Future future = sp.start();
            Assert.assertFalse((boolean)future.isDone());
        }
    }

    @Test
    public void putCausesStoreAndProcessorToBeInitialized() throws Exception {
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponse(EMPTY_DATA_EVENT));
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
            Future future = sp.start();
            this.dataSourceUpdates.awaitInit();
            ConcurrentHelpers.assertFutureIsCompleted((Future)future, (long)1L, (TimeUnit)TimeUnit.SECONDS);
            Assert.assertTrue((boolean)this.dataStore.isInitialized());
            Assert.assertTrue((boolean)sp.isInitialized());
            Assert.assertTrue((boolean)future.isDone());
        }
    }

    @Test
    public void patchUpdatesFeature() throws Exception {
        this.doPatchSuccessTest(DataModel.FEATURES, (DataModel.VersionedData)FEATURE, "/flags/" + FEATURE.getKey());
    }

    @Test
    public void patchUpdatesSegment() throws Exception {
        this.doPatchSuccessTest(DataModel.SEGMENTS, (DataModel.VersionedData)SEGMENT, "/segments/" + SEGMENT.getKey());
    }

    private void doPatchSuccessTest(DataStoreTypes.DataKind kind, DataModel.VersionedData item, String path) throws Exception {
        LinkedBlockingQueue<String> events = new LinkedBlockingQueue<String>();
        events.add(EMPTY_DATA_EVENT);
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponseFromQueue(events));
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
            sp.start();
            this.dataSourceUpdates.awaitInit();
            events.add(StreamProcessorTest.makePatchEvent(path, kind, item));
            TestComponents.MockDataSourceUpdates.UpsertParams gotUpsert = this.dataSourceUpdates.awaitUpsert();
            MatcherAssert.assertThat((Object)gotUpsert.kind, (Matcher)Matchers.equalTo((Object)kind));
            MatcherAssert.assertThat((Object)gotUpsert.key, (Matcher)Matchers.equalTo((Object)item.getKey()));
            MatcherAssert.assertThat((Object)gotUpsert.item.getVersion(), (Matcher)Matchers.equalTo((Object)item.getVersion()));
            DataStoreTypes.ItemDescriptor result = this.dataStore.get(kind, item.getKey());
            Assert.assertNotNull((Object)result.getItem());
            Assert.assertEquals((long)item.getVersion(), (long)result.getVersion());
        }
    }

    @Test
    public void deleteDeletesFeature() throws Exception {
        this.doDeleteSuccessTest(DataModel.FEATURES, (DataModel.VersionedData)FEATURE, "/flags/" + FEATURE.getKey());
    }

    @Test
    public void deleteDeletesSegment() throws Exception {
        this.doDeleteSuccessTest(DataModel.SEGMENTS, (DataModel.VersionedData)SEGMENT, "/segments/" + SEGMENT.getKey());
    }

    private void doDeleteSuccessTest(DataStoreTypes.DataKind kind, DataModel.VersionedData item, String path) throws Exception {
        LinkedBlockingQueue<String> events = new LinkedBlockingQueue<String>();
        events.add(EMPTY_DATA_EVENT);
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponseFromQueue(events));
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
            sp.start();
            this.dataSourceUpdates.awaitInit();
            this.dataStore.upsert(kind, item.getKey(), new DataStoreTypes.ItemDescriptor(item.getVersion(), (Object)item));
            events.add(StreamProcessorTest.makeDeleteEvent(path, item.getVersion() + 1));
            TestComponents.MockDataSourceUpdates.UpsertParams gotUpsert = this.dataSourceUpdates.awaitUpsert();
            MatcherAssert.assertThat((Object)gotUpsert.kind, (Matcher)Matchers.equalTo((Object)kind));
            MatcherAssert.assertThat((Object)gotUpsert.key, (Matcher)Matchers.equalTo((Object)item.getKey()));
            MatcherAssert.assertThat((Object)gotUpsert.item.getVersion(), (Matcher)Matchers.equalTo((Object)(item.getVersion() + 1)));
            Assert.assertEquals((Object)DataStoreTypes.ItemDescriptor.deletedItem((int)(item.getVersion() + 1)), (Object)this.dataStore.get(kind, item.getKey()));
        }
    }

    @Test
    public void unknownEventTypeDoesNotCauseError() throws Exception {
        this.verifyEventCausesNoStreamRestart("what", "");
    }

    @Test
    public void streamWillReconnectAfterGeneralIOException() throws Exception {
        Handler streamHandler = StreamProcessorTest.streamResponse(EMPTY_DATA_EVENT);
        try (HttpServer server = HttpServer.start((Handler)streamHandler);){
            TcpHandler errorThenSuccess = TcpHandlers.sequential((TcpHandler[])new TcpHandler[]{TcpHandlers.noResponse(), TcpHandlers.forwardToPort((int)server.getPort())});
            try (TcpServer forwardingServer = TcpServer.start((TcpHandler)errorThenSuccess);
                 StreamProcessor sp = this.createStreamProcessor(null, forwardingServer.getHttpUri());){
                StreamProcessorTest.startAndWait(sp);
                MatcherAssert.assertThat((Object)server.getRecorder().count(), (Matcher)Matchers.equalTo((Object)1));
                MatcherAssert.assertThat((Object)this.dataSourceUpdates.getLastStatus().getLastError(), (Matcher)Matchers.notNullValue());
                MatcherAssert.assertThat((Object)this.dataSourceUpdates.getLastStatus().getLastError().getKind(), (Matcher)Matchers.equalTo((Object)DataSourceStatusProvider.ErrorKind.NETWORK_ERROR));
            }
        }
    }

    @Test
    public void streamInitDiagnosticRecordedOnOpen() throws Exception {
        DiagnosticStore acc = TestComponents.basicDiagnosticStore();
        long startTime = System.currentTimeMillis();
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponse(EMPTY_DATA_EVENT));
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri(), acc);){
            StreamProcessorTest.startAndWait(sp);
            long timeAfterOpen = System.currentTimeMillis();
            LDValue event = acc.createEventAndReset(0L, 0L).getJsonValue();
            LDValue streamInits = event.get("streamInits");
            Assert.assertEquals((long)1L, (long)streamInits.size());
            LDValue init = streamInits.get(0);
            Assert.assertFalse((boolean)init.get("failed").booleanValue());
            MatcherAssert.assertThat((Object)init.get("timestamp").longValue(), (Matcher)Matchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(startTime)), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(timeAfterOpen))));
            MatcherAssert.assertThat((Object)init.get("durationMillis").longValue(), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(timeAfterOpen - startTime)));
        }
    }

    @Test
    public void streamInitDiagnosticRecordedOnErrorDuringInit() throws Exception {
        DiagnosticStore acc = TestComponents.basicDiagnosticStore();
        long startTime = System.currentTimeMillis();
        Handler errorHandler = Handlers.status((int)503);
        Handler streamHandler = StreamProcessorTest.streamResponse(EMPTY_DATA_EVENT);
        Handler errorThenSuccess = Handlers.sequential((Handler[])new Handler[]{errorHandler, streamHandler});
        try (HttpServer server = HttpServer.start((Handler)errorThenSuccess);
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri(), acc);){
            StreamProcessorTest.startAndWait(sp);
            long timeAfterOpen = System.currentTimeMillis();
            LDValue event = acc.createEventAndReset(0L, 0L).getJsonValue();
            LDValue streamInits = event.get("streamInits");
            Assert.assertEquals((long)2L, (long)streamInits.size());
            LDValue init0 = streamInits.get(0);
            Assert.assertTrue((boolean)init0.get("failed").booleanValue());
            MatcherAssert.assertThat((Object)init0.get("timestamp").longValue(), (Matcher)Matchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(startTime)), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(timeAfterOpen))));
            MatcherAssert.assertThat((Object)init0.get("durationMillis").longValue(), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(timeAfterOpen - startTime)));
            LDValue init1 = streamInits.get(1);
            Assert.assertFalse((boolean)init1.get("failed").booleanValue());
            MatcherAssert.assertThat((Object)init1.get("timestamp").longValue(), (Matcher)Matchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(init0.get("timestamp").longValue())), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(timeAfterOpen))));
        }
    }

    @Test
    public void http400ErrorIsRecoverable() throws Exception {
        this.testRecoverableHttpError(400);
    }

    @Test
    public void http401ErrorIsUnrecoverable() throws Exception {
        this.testUnrecoverableHttpError(401);
    }

    @Test
    public void http403ErrorIsUnrecoverable() throws Exception {
        this.testUnrecoverableHttpError(403);
    }

    @Test
    public void http408ErrorIsRecoverable() throws Exception {
        this.testRecoverableHttpError(408);
    }

    @Test
    public void http429ErrorIsRecoverable() throws Exception {
        this.testRecoverableHttpError(429);
    }

    @Test
    public void http500ErrorIsRecoverable() throws Exception {
        this.testRecoverableHttpError(500);
    }

    @Test
    public void putEventWithInvalidJsonCausesStreamRestart() throws Exception {
        this.verifyEventCausesStreamRestart("put", "{sorry", DataSourceStatusProvider.ErrorKind.INVALID_DATA);
    }

    @Test
    public void putEventWithWellFormedJsonButInvalidDataCausesStreamRestart() throws Exception {
        this.verifyEventCausesStreamRestart("put", "{\"data\":{\"flags\":3}}", DataSourceStatusProvider.ErrorKind.INVALID_DATA);
    }

    @Test
    public void patchEventWithInvalidJsonCausesStreamRestart() throws Exception {
        this.verifyEventCausesStreamRestart("patch", "{sorry", DataSourceStatusProvider.ErrorKind.INVALID_DATA);
    }

    @Test
    public void patchEventWithWellFormedJsonButInvalidDataCausesStreamRestart() throws Exception {
        this.verifyEventCausesStreamRestart("patch", "{\"path\":\"/flags/flagkey\", \"data\":{\"rules\":3}}", DataSourceStatusProvider.ErrorKind.INVALID_DATA);
    }

    @Test
    public void patchEventWithInvalidPathCausesNoStreamRestart() throws Exception {
        this.verifyEventCausesNoStreamRestart("patch", "{\"path\":\"/wrong\", \"data\":{\"key\":\"flagkey\"}}");
    }

    @Test
    public void patchEventWithNullPathCausesStreamRestart() throws Exception {
        this.verifyEventCausesStreamRestart("patch", "{\"path\":null, \"data\":{\"key\":\"flagkey\"}}", DataSourceStatusProvider.ErrorKind.INVALID_DATA);
    }

    @Test
    public void deleteEventWithInvalidJsonCausesStreamRestart() throws Exception {
        this.verifyEventCausesStreamRestart("delete", "{sorry", DataSourceStatusProvider.ErrorKind.INVALID_DATA);
    }

    @Test
    public void deleteEventWithInvalidPathCausesNoStreamRestart() throws Exception {
        this.verifyEventCausesNoStreamRestart("delete", "{\"path\":\"/wrong\", \"version\":1}");
    }

    @Test
    public void indirectPatchEventWithInvalidPathDoesNotCauseStreamRestart() throws Exception {
        this.verifyEventCausesNoStreamRestart("indirect/patch", "/wrong");
    }

    @Test
    public void restartsStreamIfStoreNeedsRefresh() throws Exception {
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponse(EMPTY_DATA_EVENT));
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
            StreamProcessorTest.startAndWait(sp);
            this.dataSourceUpdates.awaitInit();
            server.getRecorder().requireRequest();
            this.dataStoreStatusProvider.updateStatus(new DataStoreStatusProvider.Status(false, false));
            this.dataStoreStatusProvider.updateStatus(new DataStoreStatusProvider.Status(true, true));
            this.dataSourceUpdates.awaitInit();
            server.getRecorder().requireRequest();
            server.getRecorder().requireNoRequests(Duration.ofMillis(100L));
        }
    }

    @Test
    public void doesNotRestartStreamIfStoreHadOutageButDoesNotNeedRefresh() throws Exception {
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponse(EMPTY_DATA_EVENT));
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
            StreamProcessorTest.startAndWait(sp);
            this.dataSourceUpdates.awaitInit();
            server.getRecorder().requireRequest();
            this.dataStoreStatusProvider.updateStatus(new DataStoreStatusProvider.Status(false, false));
            this.dataStoreStatusProvider.updateStatus(new DataStoreStatusProvider.Status(true, false));
            server.getRecorder().requireNoRequests(Duration.ofMillis(100L));
        }
    }

    private void verifyStoreErrorCausesStreamRestart(String eventName, String eventData) throws Exception {
        AtomicInteger updateCount = new AtomicInteger(0);
        Runnable preUpdateHook = () -> {
            int count = updateCount.incrementAndGet();
            if (count == 2) {
                throw new RuntimeException("sorry");
            }
        };
        TestComponents.DelegatingDataStore delegatingStore = new TestComponents.DelegatingDataStore((DataStore)this.dataStore, preUpdateHook);
        this.dataStoreStatusProvider = new TestComponents.MockDataStoreStatusProvider(false);
        this.dataSourceUpdates = TestComponents.dataSourceUpdates(delegatingStore, this.dataStoreStatusProvider);
        this.verifyEventCausesStreamRestart(eventName, eventData, DataSourceStatusProvider.ErrorKind.STORE_ERROR);
    }

    @Test
    public void storeFailureOnPutCausesStreamRestart() throws Exception {
        this.verifyStoreErrorCausesStreamRestart("put", this.emptyPutEvent().getData());
    }

    @Test
    public void storeFailureOnPatchCausesStreamRestart() throws Exception {
        String patchData = "{\"path\":\"/flags/flagkey\",\"data\":{\"key\":\"flagkey\",\"version\":1}}";
        this.verifyStoreErrorCausesStreamRestart("patch", patchData);
    }

    @Test
    public void storeFailureOnDeleteCausesStreamRestart() throws Exception {
        String deleteData = "{\"path\":\"/flags/flagkey\",\"version\":1}";
        this.verifyStoreErrorCausesStreamRestart("delete", deleteData);
    }

    @Test
    public void sseCommentIsIgnored() throws Exception {
        LinkedBlockingQueue<String> events = new LinkedBlockingQueue<String>();
        events.add(EMPTY_DATA_EVENT);
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponseFromQueue(events));){
            try (StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
                StreamProcessorTest.startAndWait(sp);
                events.add(": this is a comment");
                events.add(StreamProcessorTest.makePatchEvent("/flags/" + FEATURE.getKey(), DataModel.FEATURES, (DataModel.VersionedData)FEATURE));
                this.dataSourceUpdates.awaitUpsert();
            }
            MatcherAssert.assertThat((Object)server.getRecorder().count(), (Matcher)Matchers.equalTo((Object)1));
            MatcherAssert.assertThat((Object)this.dataSourceUpdates.getLastStatus().getLastError(), (Matcher)Matchers.nullValue());
        }
    }

    private void verifyEventCausesNoStreamRestart(String eventName, String eventData) throws Exception {
        LinkedBlockingQueue<String> events = new LinkedBlockingQueue<String>();
        events.add(EMPTY_DATA_EVENT);
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponseFromQueue(events));){
            try (StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
                StreamProcessorTest.startAndWait(sp);
                events.add(StreamProcessorTest.makeEvent(eventName, eventData));
                events.add(StreamProcessorTest.makePatchEvent("/flags/" + FEATURE.getKey(), DataModel.FEATURES, (DataModel.VersionedData)FEATURE));
                this.dataSourceUpdates.awaitUpsert();
            }
            MatcherAssert.assertThat((Object)server.getRecorder().count(), (Matcher)Matchers.equalTo((Object)1));
            MatcherAssert.assertThat((Object)this.dataSourceUpdates.getLastStatus().getLastError(), (Matcher)Matchers.nullValue());
        }
    }

    private void verifyEventCausesStreamRestart(String eventName, String eventData, DataSourceStatusProvider.ErrorKind expectedError) throws Exception {
        LinkedBlockingQueue<DataSourceStatusProvider.Status> statuses = new LinkedBlockingQueue<DataSourceStatusProvider.Status>();
        this.dataSourceUpdates.statusBroadcaster.register(statuses::add);
        LinkedBlockingQueue<String> events = new LinkedBlockingQueue<String>();
        events.add(EMPTY_DATA_EVENT);
        try (HttpServer server = HttpServer.start((Handler)StreamProcessorTest.streamResponseFromQueue(events));
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
            sp.start();
            this.dataSourceUpdates.awaitInit();
            server.getRecorder().requireRequest();
            TestUtil.requireDataSourceStatus(statuses, DataSourceStatusProvider.State.VALID);
            events.add(StreamProcessorTest.makeEvent(eventName, eventData));
            events.add(EMPTY_DATA_EVENT);
            server.getRecorder().requireRequest();
            this.dataSourceUpdates.awaitInit();
            DataSourceStatusProvider.Status status = TestUtil.requireDataSourceStatus(statuses, DataSourceStatusProvider.State.INTERRUPTED);
            MatcherAssert.assertThat((Object)status.getLastError(), (Matcher)Matchers.notNullValue());
            MatcherAssert.assertThat((Object)status.getLastError().getKind(), (Matcher)Matchers.equalTo((Object)expectedError));
            TestUtil.requireDataSourceStatus(statuses, DataSourceStatusProvider.State.VALID);
        }
    }

    @Test
    public void testSpecialHttpConfigurations() throws Exception {
        Handler handler = StreamProcessorTest.streamResponse(EMPTY_DATA_EVENT);
        SpecialHttpConfigurations.testAll((Handler)handler, (serverUri, params) -> {
            LDConfig config = new LDConfig.Builder().http((ComponentConfigurer)TestUtil.makeHttpConfigurationFromTestParams(params)).build();
            ConnectionErrorSink errorSink = new ConnectionErrorSink();
            try (StreamProcessor sp = this.createStreamProcessor(config, serverUri);){
                sp.connectionErrorHandler = errorSink;
                StreamProcessorTest.startAndWait(sp);
                if (errorSink.errors.size() != 0) {
                    throw new IOException((Throwable)errorSink.errors.peek());
                }
                boolean bl = true;
                return bl;
            }
        });
    }

    private void testUnrecoverableHttpError(int statusCode) throws Exception {
        Handler errorResp = Handlers.status((int)statusCode);
        LinkedBlockingQueue<DataSourceStatusProvider.Status> statuses = new LinkedBlockingQueue<DataSourceStatusProvider.Status>();
        this.dataSourceUpdates.statusBroadcaster.register(statuses::add);
        try (HttpServer server = HttpServer.start((Handler)errorResp);
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
            Future initFuture = sp.start();
            ConcurrentHelpers.assertFutureIsCompleted((Future)initFuture, (long)2L, (TimeUnit)TimeUnit.SECONDS);
            Assert.assertFalse((boolean)sp.isInitialized());
            DataSourceStatusProvider.Status newStatus = TestUtil.requireDataSourceStatus(statuses, DataSourceStatusProvider.State.OFF);
            Assert.assertEquals((Object)DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, (Object)newStatus.getLastError().getKind());
            Assert.assertEquals((long)statusCode, (long)newStatus.getLastError().getStatusCode());
            server.getRecorder().requireRequest();
            server.getRecorder().requireNoRequests(Duration.ofMillis(50L));
        }
    }

    private void testRecoverableHttpError(int statusCode) throws Exception {
        Semaphore closeFirstStreamSignal = new Semaphore(0);
        Handler errorResp = Handlers.status((int)statusCode);
        Handler stream1Resp = StreamProcessorTest.closableStreamResponse(EMPTY_DATA_EVENT, closeFirstStreamSignal);
        Handler stream2Resp = StreamProcessorTest.streamResponse(EMPTY_DATA_EVENT);
        Handler seriesOfResponses = Handlers.sequential((Handler[])new Handler[]{errorResp, stream1Resp, errorResp, stream2Resp});
        LinkedBlockingQueue<DataSourceStatusProvider.Status> statuses = new LinkedBlockingQueue<DataSourceStatusProvider.Status>();
        this.dataSourceUpdates.statusBroadcaster.register(statuses::add);
        try (HttpServer server = HttpServer.start((Handler)seriesOfResponses);
             StreamProcessor sp = this.createStreamProcessor(null, server.getUri());){
            Future initFuture = sp.start();
            ConcurrentHelpers.assertFutureIsCompleted((Future)initFuture, (long)2L, (TimeUnit)TimeUnit.SECONDS);
            Assert.assertTrue((boolean)sp.isInitialized());
            DataSourceStatusProvider.Status failureStatus1 = TestUtil.requireDataSourceStatus(statuses, DataSourceStatusProvider.State.INITIALIZING);
            Assert.assertEquals((Object)DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, (Object)failureStatus1.getLastError().getKind());
            Assert.assertEquals((long)statusCode, (long)failureStatus1.getLastError().getStatusCode());
            DataSourceStatusProvider.Status successStatus1 = TestUtil.requireDataSourceStatus(statuses, DataSourceStatusProvider.State.VALID);
            Assert.assertSame((Object)failureStatus1.getLastError(), (Object)successStatus1.getLastError());
            closeFirstStreamSignal.release();
            DataSourceStatusProvider.Status failureStatus2 = TestUtil.requireDataSourceStatus(statuses, DataSourceStatusProvider.State.INTERRUPTED);
            Assert.assertEquals((Object)DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, (Object)failureStatus2.getLastError().getKind());
            DataSourceStatusProvider.Status failureStatus3 = TestUtil.requireDataSourceStatus(statuses, DataSourceStatusProvider.State.INTERRUPTED);
            Assert.assertEquals((Object)DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, (Object)failureStatus3.getLastError().getKind());
            Assert.assertEquals((long)statusCode, (long)failureStatus3.getLastError().getStatusCode());
            DataSourceStatusProvider.Status successStatus2 = TestUtil.requireDataSourceStatus(statuses, DataSourceStatusProvider.State.VALID);
            Assert.assertSame((Object)failureStatus3.getLastError(), (Object)successStatus2.getLastError());
        }
    }

    private StreamProcessor createStreamProcessor(URI streamUri) {
        return this.createStreamProcessor(LDConfig.DEFAULT, streamUri, null);
    }

    private StreamProcessor createStreamProcessor(LDConfig config, URI streamUri, DiagnosticStore acc) {
        return new StreamProcessor(ComponentsImpl.toHttpProperties((HttpConfiguration)TestComponents.clientContext(SDK_KEY, config == null ? LDConfig.DEFAULT : config).getHttp()), (DataSourceUpdateSink)this.dataSourceUpdates, 1, acc, streamUri, BRIEF_RECONNECT_DELAY, this.testLogger);
    }

    private StreamProcessor createStreamProcessor(LDConfig config, URI streamUri) {
        return this.createStreamProcessor(config, streamUri, null);
    }

    private static void startAndWait(StreamProcessor sp) {
        Future ready = sp.start();
        try {
            ready.get();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private MessageEvent emptyPutEvent() {
        return new MessageEvent("{\"data\":{\"flags\":{},\"segments\":{}}}");
    }

    private void assertFeatureInStore(DataModel.FeatureFlag feature) {
        Assert.assertEquals((long)feature.getVersion(), (long)this.dataStore.get(DataModel.FEATURES, feature.getKey()).getVersion());
    }

    private void assertSegmentInStore(DataModel.Segment segment) {
        Assert.assertEquals((long)segment.getVersion(), (long)this.dataStore.get(DataModel.SEGMENTS, segment.getKey()).getVersion());
    }

    static class ConnectionErrorSink
    implements ConnectionErrorHandler {
        final BlockingQueue<Throwable> errors = new LinkedBlockingQueue<Throwable>();

        ConnectionErrorSink() {
        }

        public ConnectionErrorHandler.Action onConnectionError(Throwable t) {
            if (!(t instanceof EOFException)) {
                this.errors.add(t);
            }
            return ConnectionErrorHandler.Action.SHUTDOWN;
        }
    }
}

