/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.client;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.curator.framework.api.ChildrenDeletable;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.BatchServerInventoryView;
import org.apache.druid.client.CoordinatorSegmentWatcherConfig;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.client.DirectDruidClientFactory;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.SegmentLoadInfo;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class CoordinatorServerViewTest
extends CuratorTestBase {
    private ObjectMapper jsonMapper;
    private ZkPathsConfig zkPathsConfig;
    private String inventoryPath;
    private CountDownLatch segmentViewInitLatch;
    private CountDownLatch segmentAddedLatch;
    private CountDownLatch segmentRemovedLatch;
    private CountDownLatch callbackSegmentViewInitLatch;
    private CountDownLatch callbackSegmentAddedLatch;
    private CountDownLatch callbackSegmentRemovedLatch;
    private CountDownLatch callbackServerSegmentRemovedLatch;
    private BatchServerInventoryView baseView;
    private CoordinatorServerView coordinatorServerView;
    private ExecutorService callbackExec;
    private boolean setDruidClientFactory;

    @Parameterized.Parameters
    public static Object[] data() {
        return new Object[]{true, false};
    }

    public CoordinatorServerViewTest(boolean setDruidClientFactory) {
        this.setDruidClientFactory = setDruidClientFactory;
    }

    @Before
    public void setUp() throws Exception {
        this.jsonMapper = TestHelper.makeJsonMapper();
        this.zkPathsConfig = new ZkPathsConfig();
        this.inventoryPath = this.zkPathsConfig.getLiveSegmentsPath();
        this.callbackExec = Execs.singleThreaded((String)"CoordinatorServerViewTest-%s");
        this.setupServerAndCurator();
        this.curator.start();
        this.curator.blockUntilConnected();
    }

    @Test
    public void testSingleServerAddedRemovedSegment() throws Exception {
        this.segmentViewInitLatch = new CountDownLatch(1);
        this.segmentAddedLatch = new CountDownLatch(1);
        this.segmentRemovedLatch = new CountDownLatch(1);
        this.callbackSegmentViewInitLatch = new CountDownLatch(1);
        this.callbackSegmentAddedLatch = new CountDownLatch(1);
        this.callbackServerSegmentRemovedLatch = new CountDownLatch(1);
        this.callbackSegmentRemovedLatch = new CountDownLatch(1);
        this.setupViews(this.setDruidClientFactory);
        DruidServer druidServer = new DruidServer("localhost:1234", "localhost:1234", null, 10000000L, ServerType.HISTORICAL, "default_tier", 0);
        this.setupZNodeForServer(druidServer, this.zkPathsConfig, this.jsonMapper);
        DataSegment segment = this.dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1");
        int partition = segment.getShardSpec().getPartitionNum();
        Interval intervals = Intervals.of((String)"2014-10-20T00:00:00Z/P1D");
        this.announceSegmentForServer(druidServer, segment, this.zkPathsConfig, this.jsonMapper);
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentViewInitLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentAddedLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.callbackSegmentViewInitLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.callbackSegmentAddedLatch));
        if (this.setDruidClientFactory) {
            Assert.assertNotNull((Object)this.coordinatorServerView.getQueryRunner(druidServer.getName()));
        } else {
            Assert.assertNull((Object)this.coordinatorServerView.getQueryRunner(druidServer.getName()));
        }
        VersionedIntervalTimeline timeline = this.coordinatorServerView.getTimeline((DataSource)new TableDataSource("test_overlord_server_view"));
        List serverLookupRes = timeline.lookup(intervals);
        Assert.assertEquals((long)1L, (long)serverLookupRes.size());
        TimelineObjectHolder actualTimelineObjectHolder = (TimelineObjectHolder)serverLookupRes.get(0);
        Assert.assertEquals((Object)intervals, (Object)actualTimelineObjectHolder.getInterval());
        Assert.assertEquals((Object)"v1", (Object)actualTimelineObjectHolder.getVersion());
        PartitionHolder actualPartitionHolder = actualTimelineObjectHolder.getObject();
        Assert.assertTrue((boolean)actualPartitionHolder.isComplete());
        Assert.assertEquals((long)1L, (long)Iterables.size((Iterable)actualPartitionHolder));
        SegmentLoadInfo segmentLoadInfo = (SegmentLoadInfo)((PartitionChunk)actualPartitionHolder.iterator().next()).getObject();
        Assert.assertFalse((boolean)segmentLoadInfo.isEmpty());
        Assert.assertEquals((Object)druidServer.getMetadata(), (Object)Iterables.getOnlyElement((Iterable)segmentLoadInfo.toImmutableSegmentLoadInfo().getServers()));
        Assert.assertNotNull((Object)timeline.findChunk(intervals, (Object)"v1", partition));
        this.unannounceSegmentForServer(druidServer, segment);
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentRemovedLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.callbackServerSegmentRemovedLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.callbackSegmentRemovedLatch));
        Assert.assertEquals((long)0L, (long)timeline.lookup(Intervals.of((String)"2014-10-20T00:00:00Z/P1D")).size());
        Assert.assertNull((Object)timeline.findChunk(intervals, (Object)"v1", partition));
    }

    @Test
    public void testMultipleServerAddedRemovedSegment() throws Exception {
        int i;
        this.segmentViewInitLatch = new CountDownLatch(1);
        this.segmentAddedLatch = new CountDownLatch(5);
        this.segmentRemovedLatch = new CountDownLatch(1);
        this.callbackSegmentViewInitLatch = new CountDownLatch(1);
        this.callbackSegmentAddedLatch = new CountDownLatch(5);
        this.callbackServerSegmentRemovedLatch = new CountDownLatch(1);
        this.callbackSegmentRemovedLatch = new CountDownLatch(1);
        this.setupViews(this.setDruidClientFactory);
        List druidServers = Lists.transform((List)ImmutableList.of((Object)"localhost:0", (Object)"localhost:1", (Object)"localhost:2", (Object)"localhost:3", (Object)"localhost:4"), (Function)new Function<String, DruidServer>(){

            public DruidServer apply(String input) {
                return new DruidServer(input, input, null, 10000000L, ServerType.HISTORICAL, "default_tier", 0);
            }
        });
        for (DruidServer druidServer : druidServers) {
            this.setupZNodeForServer(druidServer, this.zkPathsConfig, this.jsonMapper);
        }
        List segments = Lists.transform((List)ImmutableList.of((Object)Pair.of((Object)"2011-04-01/2011-04-03", (Object)"v1"), (Object)Pair.of((Object)"2011-04-03/2011-04-06", (Object)"v1"), (Object)Pair.of((Object)"2011-04-01/2011-04-09", (Object)"v2"), (Object)Pair.of((Object)"2011-04-06/2011-04-09", (Object)"v3"), (Object)Pair.of((Object)"2011-04-01/2011-04-02", (Object)"v3")), (Function)new Function<Pair<String, String>, DataSegment>(){

            public DataSegment apply(Pair<String, String> input) {
                return CoordinatorServerViewTest.this.dataSegmentWithIntervalAndVersion((String)input.lhs, (String)input.rhs);
            }
        });
        for (i = 0; i < 5; ++i) {
            this.announceSegmentForServer((DruidServer)druidServers.get(i), (DataSegment)segments.get(i), this.zkPathsConfig, this.jsonMapper);
        }
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentViewInitLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentAddedLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.callbackSegmentViewInitLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.callbackSegmentAddedLatch));
        for (i = 0; i < 5; ++i) {
            if (this.setDruidClientFactory) {
                Assert.assertNotNull((Object)this.coordinatorServerView.getQueryRunner(((DruidServer)druidServers.get(i)).getName()));
                continue;
            }
            Assert.assertNull((Object)this.coordinatorServerView.getQueryRunner(((DruidServer)druidServers.get(i)).getName()));
        }
        VersionedIntervalTimeline timeline = this.coordinatorServerView.getTimeline((DataSource)new TableDataSource("test_overlord_server_view"));
        this.assertValues(Arrays.asList(this.createExpected("2011-04-01/2011-04-02", "v3", (DruidServer)druidServers.get(4), (DataSegment)segments.get(4)), this.createExpected("2011-04-02/2011-04-06", "v2", (DruidServer)druidServers.get(2), (DataSegment)segments.get(2)), this.createExpected("2011-04-06/2011-04-09", "v3", (DruidServer)druidServers.get(3), (DataSegment)segments.get(3))), timeline.lookup(Intervals.of((String)"2011-04-01/2011-04-09")));
        this.unannounceSegmentForServer((DruidServer)druidServers.get(2), (DataSegment)segments.get(2));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentRemovedLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.callbackSegmentRemovedLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.callbackServerSegmentRemovedLatch));
        this.segmentRemovedLatch = new CountDownLatch(4);
        this.callbackServerSegmentRemovedLatch = new CountDownLatch(4);
        this.callbackSegmentRemovedLatch = new CountDownLatch(4);
        timeline = this.coordinatorServerView.getTimeline((DataSource)new TableDataSource("test_overlord_server_view"));
        this.assertValues(Arrays.asList(this.createExpected("2011-04-01/2011-04-02", "v3", (DruidServer)druidServers.get(4), (DataSegment)segments.get(4)), this.createExpected("2011-04-02/2011-04-03", "v1", (DruidServer)druidServers.get(0), (DataSegment)segments.get(0)), this.createExpected("2011-04-03/2011-04-06", "v1", (DruidServer)druidServers.get(1), (DataSegment)segments.get(1)), this.createExpected("2011-04-06/2011-04-09", "v3", (DruidServer)druidServers.get(3), (DataSegment)segments.get(3))), timeline.lookup(Intervals.of((String)"2011-04-01/2011-04-09")));
        for (int i2 = 0; i2 < 5; ++i2) {
            if (i2 == 2) continue;
            this.unannounceSegmentForServer((DruidServer)druidServers.get(i2), (DataSegment)segments.get(i2));
        }
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentRemovedLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.callbackSegmentRemovedLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.callbackServerSegmentRemovedLatch));
        Assert.assertEquals((long)0L, (long)timeline.lookup(Intervals.of((String)"2011-04-01/2011-04-09")).size());
    }

    private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception {
        ((ChildrenDeletable)this.curator.delete().guaranteed()).forPath(ZKPaths.makePath((String)this.inventoryPath, (String)druidServer.getHost(), (String[])new String[]{segment.getId().toString()}));
    }

    private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected(String intervalStr, String version, DruidServer druidServer, DataSegment segment) {
        return Pair.of((Object)Intervals.of((String)intervalStr), (Object)Pair.of((Object)version, (Object)Pair.of((Object)druidServer, (Object)segment)));
    }

    private void assertValues(List<Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>>> expected, List<TimelineObjectHolder> actual) {
        Assert.assertEquals((long)expected.size(), (long)actual.size());
        for (int i = 0; i < expected.size(); ++i) {
            Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> expectedPair = expected.get(i);
            TimelineObjectHolder actualTimelineObjectHolder = actual.get(i);
            Assert.assertEquals((Object)expectedPair.lhs, (Object)actualTimelineObjectHolder.getInterval());
            Assert.assertEquals((Object)((Pair)expectedPair.rhs).lhs, (Object)actualTimelineObjectHolder.getVersion());
            PartitionHolder actualPartitionHolder = actualTimelineObjectHolder.getObject();
            Assert.assertTrue((boolean)actualPartitionHolder.isComplete());
            Assert.assertEquals((long)1L, (long)Iterables.size((Iterable)actualPartitionHolder));
            SegmentLoadInfo segmentLoadInfo = (SegmentLoadInfo)((PartitionChunk)actualPartitionHolder.iterator().next()).getObject();
            Assert.assertFalse((boolean)segmentLoadInfo.isEmpty());
            Assert.assertEquals((Object)((DruidServer)((Pair)((Pair)expectedPair.rhs).rhs).lhs).getMetadata(), (Object)Iterables.getOnlyElement((Iterable)segmentLoadInfo.toImmutableSegmentLoadInfo().getServers()));
            Assert.assertEquals((Object)((DruidServer)((Pair)((Pair)expectedPair.rhs).rhs).lhs).getMetadata(), (Object)segmentLoadInfo.pickOne());
        }
    }

    private void setupViews(boolean setDruidClientFactory) throws Exception {
        this.baseView = new BatchServerInventoryView(this.zkPathsConfig, this.curator, this.jsonMapper, Predicates.alwaysTrue(), "test"){

            public void registerSegmentCallback(Executor exec, final ServerView.SegmentCallback callback) {
                super.registerSegmentCallback(exec, new ServerView.SegmentCallback(){

                    public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) {
                        ServerView.CallbackAction res = callback.segmentAdded(server, segment);
                        CoordinatorServerViewTest.this.segmentAddedLatch.countDown();
                        return res;
                    }

                    public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) {
                        ServerView.CallbackAction res = callback.segmentRemoved(server, segment);
                        CoordinatorServerViewTest.this.segmentRemovedLatch.countDown();
                        return res;
                    }

                    public ServerView.CallbackAction segmentViewInitialized() {
                        ServerView.CallbackAction res = callback.segmentViewInitialized();
                        CoordinatorServerViewTest.this.segmentViewInitLatch.countDown();
                        return res;
                    }

                    public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) {
                        return ServerView.CallbackAction.CONTINUE;
                    }
                });
            }
        };
        DirectDruidClientFactory druidClientFactory = null;
        if (setDruidClientFactory) {
            druidClientFactory = (DirectDruidClientFactory)EasyMock.createMock(DirectDruidClientFactory.class);
            DirectDruidClient directDruidClient = (DirectDruidClient)EasyMock.mock(DirectDruidClient.class);
            EasyMock.expect((Object)druidClientFactory.makeDirectClient((DruidServer)EasyMock.anyObject(DruidServer.class))).andReturn((Object)directDruidClient).anyTimes();
            EasyMock.replay((Object[])new Object[]{druidClientFactory});
        }
        this.coordinatorServerView = new CoordinatorServerView((ServerInventoryView)this.baseView, new CoordinatorSegmentWatcherConfig(), (ServiceEmitter)new NoopServiceEmitter(), druidClientFactory);
        this.baseView.start();
        this.initServerViewTimelineCallback(this.coordinatorServerView);
        this.coordinatorServerView.start();
    }

    private void initServerViewTimelineCallback(CoordinatorServerView serverView) {
        serverView.registerTimelineCallback((Executor)this.callbackExec, new TimelineServerView.TimelineCallback(){

            public ServerView.CallbackAction timelineInitialized() {
                CoordinatorServerViewTest.this.callbackSegmentViewInitLatch.countDown();
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) {
                CoordinatorServerViewTest.this.callbackSegmentAddedLatch.countDown();
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentRemoved(DataSegment segment) {
                CoordinatorServerViewTest.this.callbackSegmentRemovedLatch.countDown();
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction serverSegmentRemoved(DruidServerMetadata server, DataSegment segment) {
                CoordinatorServerViewTest.this.callbackServerSegmentRemovedLatch.countDown();
                return ServerView.CallbackAction.CONTINUE;
            }

            public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) {
                return ServerView.CallbackAction.CONTINUE;
            }
        });
    }

    private DataSegment dataSegmentWithIntervalAndVersion(String intervalStr, String version) {
        return DataSegment.builder().dataSource("test_overlord_server_view").interval(Intervals.of((String)intervalStr)).loadSpec((Map)ImmutableMap.of((Object)"type", (Object)"local", (Object)"path", (Object)"somewhere")).version(version).dimensions((List)ImmutableList.of()).metrics((List)ImmutableList.of()).shardSpec((ShardSpec)NoneShardSpec.instance()).binaryVersion(Integer.valueOf(9)).size(0L).build();
    }

    @After
    public void tearDown() throws Exception {
        this.baseView.stop();
        this.tearDownServerAndCurator();
    }
}

