/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.client;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.HttpServerInventoryView;
import org.apache.druid.client.HttpServerInventoryViewConfig;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TestChangeRequestHttpClient;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class HttpServerInventoryViewTest {
    private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
    private static final TypeReference<ChangeRequestsSnapshot<DataSegmentChangeRequest>> TYPE_REF = HttpServerInventoryView.SEGMENT_LIST_RESP_TYPE_REF;
    private static final String EXEC_NAME_PREFIX = "InventoryViewTest";
    private static final String METRIC_SUCCESS = "serverview/sync/healthy";
    private static final String METRIC_UNSTABLE_TIME = "serverview/sync/unstableTime";
    private StubServiceEmitter serviceEmitter;
    private HttpServerInventoryView httpServerInventoryView;
    private TestChangeRequestHttpClient<ChangeRequestsSnapshot<DataSegmentChangeRequest>> httpClient;
    private TestExecutorFactory execHelper;
    private TestDruidNodeDiscovery druidNodeDiscovery;
    private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
    private Map<DruidServerMetadata, Set<DataSegment>> segmentsAddedToView;
    private Map<DruidServerMetadata, Set<DataSegment>> segmentsRemovedFromView;
    private Set<DruidServerMetadata> addedServers;
    private Set<DruidServerMetadata> removedServers;
    private AtomicBoolean inventoryInitialized;

    @Before
    public void setup() {
        this.serviceEmitter = new StubServiceEmitter("test", "localhost");
        EmittingLogger.registerEmitter((ServiceEmitter)this.serviceEmitter);
        this.druidNodeDiscovery = new TestDruidNodeDiscovery();
        this.druidNodeDiscoveryProvider = (DruidNodeDiscoveryProvider)EasyMock.createMock(DruidNodeDiscoveryProvider.class);
        EasyMock.expect((Object)this.druidNodeDiscoveryProvider.getForService("dataNodeService")).andReturn((Object)this.druidNodeDiscovery);
        EasyMock.replay((Object[])new Object[]{this.druidNodeDiscoveryProvider});
        this.httpClient = new TestChangeRequestHttpClient<ChangeRequestsSnapshot<DataSegmentChangeRequest>>(TYPE_REF, MAPPER);
        this.execHelper = new TestExecutorFactory();
        this.inventoryInitialized = new AtomicBoolean(false);
        this.segmentsAddedToView = new HashMap<DruidServerMetadata, Set<DataSegment>>();
        this.segmentsRemovedFromView = new HashMap<DruidServerMetadata, Set<DataSegment>>();
        this.addedServers = new HashSet<DruidServerMetadata>();
        this.removedServers = new HashSet<DruidServerMetadata>();
        this.createInventoryView(new HttpServerInventoryViewConfig(null, null, null));
    }

    @After
    public void tearDown() {
        EasyMock.verify((Object[])new Object[]{this.druidNodeDiscoveryProvider});
        if (this.httpServerInventoryView != null && this.httpServerInventoryView.isStarted()) {
            this.httpServerInventoryView.stop();
        }
    }

    @Test
    public void testInitHappensAfterNodeViewInit() {
        this.httpServerInventoryView.start();
        Assert.assertTrue((boolean)this.httpServerInventoryView.isStarted());
        Assert.assertFalse((boolean)this.inventoryInitialized.get());
        this.druidNodeDiscovery.markNodeViewInitialized();
        Assert.assertFalse((boolean)this.inventoryInitialized.get());
        this.execHelper.finishInventoryInitialization();
        Assert.assertTrue((boolean)this.inventoryInitialized.get());
        this.httpServerInventoryView.stop();
    }

    @Test
    public void testStopShutsDownExecutors() {
        this.httpServerInventoryView.start();
        Assert.assertFalse((boolean)this.execHelper.syncExecutor.isShutdown());
        this.httpServerInventoryView.stop();
        Assert.assertTrue((boolean)this.execHelper.syncExecutor.isShutdown());
    }

    @Test
    public void testAddNodeStartsSync() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        this.execHelper.finishInventoryInitialization();
        DiscoveryDruidNode druidNode = this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        DruidServer server = druidNode.toDruidServer();
        Collection inventory = this.httpServerInventoryView.getInventory();
        Assert.assertEquals((long)1L, (long)inventory.size());
        Assert.assertTrue((boolean)inventory.contains(server));
        Assert.assertTrue((boolean)this.addedServers.contains(server.getMetadata()));
        this.execHelper.emitMetrics();
        this.serviceEmitter.verifyValue(METRIC_SUCCESS, (Number)1);
        this.serviceEmitter.verifyNotEmitted(METRIC_UNSTABLE_TIME);
        DataSegment segment = CreateDataSegments.ofDatasource("wiki").eachOfSizeInMb(500L).get(0);
        this.httpClient.completeNextRequestWith(HttpServerInventoryViewTest.snapshotOf(new DataSegmentChangeRequest[]{new SegmentChangeRequestLoad(segment)}));
        this.execHelper.sendSyncRequestAndHandleResponse();
        DruidServer inventoryValue = this.httpServerInventoryView.getInventoryValue(server.getName());
        Assert.assertNotNull((Object)inventoryValue);
        Assert.assertEquals((long)1L, (long)inventoryValue.getTotalSegments());
        Assert.assertNotNull((Object)inventoryValue.getSegment(segment.getId()));
        this.httpServerInventoryView.stop();
    }

    @Test
    public void testRemoveNodeStopsSync() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        this.execHelper.finishInventoryInitialization();
        DiscoveryDruidNode druidNode = this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        DruidServer server = druidNode.toDruidServer();
        this.druidNodeDiscovery.removeNodesAndNotifyListeners(druidNode);
        Assert.assertNull((Object)this.httpServerInventoryView.getInventoryValue(server.getName()));
        this.execHelper.emitMetrics();
        this.serviceEmitter.verifyNotEmitted(METRIC_SUCCESS);
        this.serviceEmitter.verifyNotEmitted(METRIC_UNSTABLE_TIME);
        this.httpServerInventoryView.stop();
    }

    @Test
    public void testAddNodeTriggersServerAddedCallback() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        this.execHelper.finishInventoryInitialization();
        DiscoveryDruidNode druidNode = this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        DruidServer server = druidNode.toDruidServer();
        Assert.assertTrue((boolean)this.addedServers.contains(server.getMetadata()));
        this.httpServerInventoryView.stop();
    }

    @Test(timeout=60000L)
    public void testSyncSegmentLoadAndDrop() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        this.execHelper.finishInventoryInitialization();
        DiscoveryDruidNode druidNode = this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        DruidServer server = druidNode.toDruidServer();
        Assert.assertTrue((boolean)this.addedServers.contains(server.getMetadata()));
        DataSegment[] segments = CreateDataSegments.ofDatasource("wiki").forIntervals(4, Granularities.DAY).eachOfSizeInMb(500L).toArray(new DataSegment[0]);
        this.httpClient.completeNextRequestWith(HttpServerInventoryViewTest.snapshotOf(new DataSegmentChangeRequest[]{new SegmentChangeRequestLoad(segments[0])}));
        this.execHelper.sendSyncRequestAndHandleResponse();
        Assert.assertTrue((boolean)this.isAddedToView(server, segments[0]));
        this.resetForNextSyncRequest();
        this.httpClient.completeNextRequestWith(HttpServerInventoryViewTest.snapshotOf(new DataSegmentChangeRequest[]{new SegmentChangeRequestDrop(segments[0]), new SegmentChangeRequestLoad(segments[1]), new SegmentChangeRequestLoad(segments[2])}));
        this.execHelper.sendSyncRequestAndHandleResponse();
        Assert.assertTrue((boolean)this.isRemovedFromView(server, segments[0]));
        Assert.assertTrue((boolean)this.isAddedToView(server, segments[1]));
        Assert.assertTrue((boolean)this.isAddedToView(server, segments[2]));
        this.resetForNextSyncRequest();
        this.httpClient.completeNextRequestWith((ChangeRequestsSnapshot<DataSegmentChangeRequest>)new ChangeRequestsSnapshot(true, "Server requested reset", ChangeRequestHistory.Counter.ZERO, Collections.emptyList()));
        this.execHelper.sendSyncRequestAndHandleResponse();
        Assert.assertTrue((boolean)this.segmentsAddedToView.isEmpty());
        Assert.assertTrue((boolean)this.segmentsRemovedFromView.isEmpty());
        this.resetForNextSyncRequest();
        this.httpClient.completeNextRequestWith(HttpServerInventoryViewTest.snapshotOf(new DataSegmentChangeRequest[]{new SegmentChangeRequestLoad(segments[2]), new SegmentChangeRequestLoad(segments[3])}));
        this.execHelper.sendSyncRequestAndHandleResponse();
        Assert.assertTrue((boolean)this.isRemovedFromView(server, segments[1]));
        Assert.assertTrue((boolean)this.isAddedToView(server, segments[3]));
        DruidServer inventoryValue = this.httpServerInventoryView.getInventoryValue(server.getName());
        Assert.assertNotNull((Object)inventoryValue);
        Assert.assertEquals((long)2L, (long)inventoryValue.getTotalSegments());
        Assert.assertNotNull((Object)inventoryValue.getSegment(segments[2].getId()));
        Assert.assertNotNull((Object)inventoryValue.getSegment(segments[3].getId()));
        this.druidNodeDiscovery.removeNodesAndNotifyListeners(druidNode);
        this.druidNodeDiscovery.removeNodesAndNotifyListeners(new DiscoveryDruidNode(new DruidNode("service", "host", false, Integer.valueOf(8080), null, true, false), NodeRole.INDEXER, Collections.emptyMap()));
        this.druidNodeDiscovery.removeNodesAndNotifyListeners(new DiscoveryDruidNode(new DruidNode("service", "host", false, Integer.valueOf(8080), null, true, false), NodeRole.INDEXER, (Map)ImmutableMap.of((Object)"dataNodeService", (Object)new LookupNodeService("lookyloo"))));
        Assert.assertTrue((boolean)this.removedServers.contains(server.getMetadata()));
        Assert.assertNull((Object)this.httpServerInventoryView.getInventoryValue(server.getName()));
        this.httpServerInventoryView.stop();
    }

    @Test
    public void testSyncWhenRequestFailedToSend() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        this.execHelper.finishInventoryInitialization();
        DiscoveryDruidNode druidNode = this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        Assert.assertTrue((boolean)this.addedServers.contains(druidNode.toDruidServer().getMetadata()));
        this.httpClient.failToSendNextRequestWith((RuntimeException)new ISE("Could not send request to server", new Object[0]));
        this.execHelper.sendSyncRequest();
        this.serviceEmitter.flush();
        this.execHelper.emitMetrics();
        this.serviceEmitter.verifyValue(METRIC_SUCCESS, (Number)0);
        this.httpServerInventoryView.stop();
    }

    @Test
    public void testSyncWhenErrorResponse() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        this.execHelper.finishInventoryInitialization();
        this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        this.httpClient.completeNextRequestWith(InvalidInput.exception((String)"failure on server", (Object[])new Object[0]));
        this.execHelper.sendSyncRequestAndHandleResponse();
        this.serviceEmitter.flush();
        this.execHelper.emitMetrics();
        this.serviceEmitter.verifyValue(METRIC_SUCCESS, (Number)0);
        this.httpServerInventoryView.stop();
    }

    @Test
    public void testUnstableServerAlertsAfterTimeout() {
        this.createInventoryView(new HttpServerInventoryViewConfig(null, Period.millis((int)0), null));
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        this.execHelper.finishInventoryInitialization();
        DiscoveryDruidNode druidNode = this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        Assert.assertTrue((boolean)this.addedServers.contains(druidNode.toDruidServer().getMetadata()));
        this.serviceEmitter.flush();
        this.httpClient.completeNextRequestWith(InvalidInput.exception((String)"failure on server", (Object[])new Object[0]));
        this.execHelper.sendSyncRequestAndHandleResponse();
        List alerts = this.serviceEmitter.getAlerts();
        Assert.assertEquals((long)1L, (long)alerts.size());
        AlertEvent alert = (AlertEvent)alerts.get(0);
        Assert.assertTrue((boolean)alert.getDescription().contains("Sync failed for server"));
        this.serviceEmitter.flush();
        this.execHelper.emitMetrics();
        this.serviceEmitter.verifyValue(METRIC_SUCCESS, (Number)0);
        this.httpServerInventoryView.stop();
    }

    @Test(timeout=60000L)
    public void testInitWaitsForServerToSync() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        DiscoveryDruidNode druidNode = this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        DruidServer server = druidNode.toDruidServer();
        Assert.assertTrue((boolean)this.addedServers.contains(server.getMetadata()));
        ExecutorService initExecutor = Execs.singleThreaded((String)"InventoryViewTest-init");
        try {
            initExecutor.submit(() -> this.execHelper.finishInventoryInitialization());
            Thread.sleep(1000L);
            Assert.assertFalse((boolean)this.inventoryInitialized.get());
            this.httpClient.completeNextRequestWith(HttpServerInventoryViewTest.snapshotOf(new DataSegmentChangeRequest[0]));
            this.execHelper.sendSyncRequestAndHandleResponse();
            Thread.sleep(10000L);
            Assert.assertTrue((boolean)this.inventoryInitialized.get());
        }
        catch (InterruptedException e) {
            throw new ISE((Throwable)e, "Interrupted", new Object[0]);
        }
        finally {
            initExecutor.shutdownNow();
        }
    }

    @Test(timeout=60000L)
    public void testInitDoesNotWaitForRemovedServerToSync() {
        this.httpServerInventoryView.start();
        this.druidNodeDiscovery.markNodeViewInitialized();
        DiscoveryDruidNode node = this.druidNodeDiscovery.addNodeAndNotifyListeners("localhost");
        Assert.assertTrue((boolean)this.addedServers.contains(node.toDruidServer().getMetadata()));
        ExecutorService initExecutor = Execs.singleThreaded((String)"InventoryViewTest-init");
        try {
            initExecutor.submit(() -> this.execHelper.finishInventoryInitialization());
            Thread.sleep(1000L);
            Assert.assertFalse((boolean)this.inventoryInitialized.get());
            this.druidNodeDiscovery.removeNodesAndNotifyListeners(node);
            Thread.sleep(10000L);
            Assert.assertTrue((boolean)this.inventoryInitialized.get());
        }
        catch (InterruptedException e) {
            throw new ISE((Throwable)e, "Interrupted", new Object[0]);
        }
        finally {
            initExecutor.shutdownNow();
        }
    }

    private void createInventoryView(HttpServerInventoryViewConfig config) {
        this.httpServerInventoryView = new HttpServerInventoryView(MAPPER, this.httpClient, this.druidNodeDiscoveryProvider, pair -> !((DataSegment)pair.rhs).getDataSource().equals("non-loading-datasource"), config, (ServiceEmitter)this.serviceEmitter, (ScheduledExecutorFactory)this.execHelper, EXEC_NAME_PREFIX);
        this.httpServerInventoryView.registerSegmentCallback((Executor)Execs.directExecutor(), new ServerView.SegmentCallback(){

            public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) {
                HttpServerInventoryViewTest.this.segmentsAddedToView.computeIfAbsent(server, s -> new HashSet()).add(segment);
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) {
                HttpServerInventoryViewTest.this.segmentsRemovedFromView.computeIfAbsent(server, s -> new HashSet()).add(segment);
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentViewInitialized() {
                HttpServerInventoryViewTest.this.inventoryInitialized.set(true);
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) {
                return ServerView.CallbackAction.CONTINUE;
            }
        });
        this.httpServerInventoryView.registerServerCallback((Executor)Execs.directExecutor(), new ServerView.ServerCallback(){

            public ServerView.CallbackAction serverAdded(DruidServer server) {
                HttpServerInventoryViewTest.this.addedServers.add(server.getMetadata());
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction serverRemoved(DruidServer server) {
                HttpServerInventoryViewTest.this.removedServers.add(server.getMetadata());
                return ServerView.CallbackAction.CONTINUE;
            }
        });
    }

    private boolean isAddedToView(DruidServer server, DataSegment segment) {
        return this.segmentsAddedToView.getOrDefault(server.getMetadata(), Collections.emptySet()).contains(segment);
    }

    private boolean isRemovedFromView(DruidServer server, DataSegment segment) {
        return this.segmentsRemovedFromView.getOrDefault(server.getMetadata(), Collections.emptySet()).contains(segment);
    }

    private void resetForNextSyncRequest() {
        this.segmentsAddedToView.clear();
        this.segmentsRemovedFromView.clear();
    }

    private static ChangeRequestsSnapshot<DataSegmentChangeRequest> snapshotOf(DataSegmentChangeRequest ... requests) {
        return ChangeRequestsSnapshot.success((ChangeRequestHistory.Counter)ChangeRequestHistory.Counter.ZERO, Arrays.asList(requests));
    }

    private static class TestDruidNodeDiscovery
    implements DruidNodeDiscovery {
        DruidNodeDiscovery.Listener listener;

        private TestDruidNodeDiscovery() {
        }

        public Collection<DiscoveryDruidNode> getAllNodes() {
            throw new UnsupportedOperationException("Not Implemented.");
        }

        public void registerListener(DruidNodeDiscovery.Listener listener) {
            this.listener = listener;
        }

        void markNodeViewInitialized() {
            this.listener.nodeViewInitialized();
        }

        DiscoveryDruidNode addNodeAndNotifyListeners(String host) {
            DruidNode druidNode = new DruidNode("druid/historical", host, false, Integer.valueOf(8080), null, true, false);
            DataNodeService dataNodeService = new DataNodeService("tier", 0x280000000L, ServerType.HISTORICAL, 0);
            DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(druidNode, NodeRole.HISTORICAL, (Map)ImmutableMap.of((Object)"dataNodeService", (Object)dataNodeService));
            this.listener.nodesAdded((Collection)ImmutableList.of((Object)discoveryDruidNode));
            return discoveryDruidNode;
        }

        void removeNodesAndNotifyListeners(DiscoveryDruidNode ... nodesToRemove) {
            this.listener.nodesRemoved(Arrays.asList(nodesToRemove));
        }
    }

    private static class TestExecutorFactory
    implements ScheduledExecutorFactory {
        private BlockingExecutorService syncExecutor;
        private BlockingExecutorService monitorExecutor;

        private TestExecutorFactory() {
        }

        public ScheduledExecutorService create(int corePoolSize, String nameFormat) {
            BlockingExecutorService executorService = new BlockingExecutorService(nameFormat);
            String syncExecutorPrefix = "InventoryViewTest-%s";
            String monitorExecutorPrefix = "InventoryViewTest-monitor-%s";
            if ("InventoryViewTest-%s".equals(nameFormat)) {
                this.syncExecutor = executorService;
            } else if ("InventoryViewTest-monitor-%s".equals(nameFormat)) {
                this.monitorExecutor = executorService;
            }
            return new WrappingScheduledExecutorService(nameFormat, executorService, false);
        }

        void sendSyncRequestAndHandleResponse() {
            this.syncExecutor.finishNextPendingTasks(2);
        }

        void sendSyncRequest() {
            this.syncExecutor.finishNextPendingTask();
        }

        void finishInventoryInitialization() {
            this.syncExecutor.finishNextPendingTask();
        }

        void emitMetrics() {
            this.monitorExecutor.finishNextPendingTasks(2);
        }
    }
}

