/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.api.ChildrenDeletable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.BatchServerInventoryView;
import org.apache.druid.client.CoordinatorSegmentWatcherConfig;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.ServerView;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.loading.CuratorLoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.DeadlockDetectingTimeout;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

public class CuratorDruidCoordinatorTest
extends CuratorTestBase {
    private DataSourcesSnapshot dataSourcesSnapshot;
    private DruidCoordinatorRuntimeParams coordinatorRuntimeParams;
    private LoadQueuePeon sourceLoadQueuePeon;
    private LoadQueuePeon destinationLoadQueuePeon;
    private PathChildrenCache sourceLoadQueueChildrenCache;
    private PathChildrenCache destinationLoadQueueChildrenCache;
    private static final String SEGPATH = "/druid/segments";
    private static final String SOURCE_LOAD_PATH = "/druid/loadQueue/localhost:1";
    private static final String DESTINATION_LOAD_PATH = "/druid/loadQueue/localhost:2";
    private static final long COORDINATOR_START_DELAY = 1L;
    private static final long COORDINATOR_PERIOD = 100L;
    private BatchServerInventoryView baseView;
    private CoordinatorServerView serverView;
    private CountDownLatch segmentViewInitLatch;
    private volatile CountDownLatch segmentAddedLatch;
    private volatile CountDownLatch segmentRemovedLatch;
    private final ObjectMapper jsonMapper;
    private final ZkPathsConfig zkPathsConfig;
    private final ScheduledExecutorService peonExec = Execs.scheduledSingleThreaded((String)"Master-PeonExec--%d");
    private final ExecutorService callbackExec = Execs.multiThreaded((int)4, (String)"LoadQueuePeon-callbackexec--%d");
    @Rule
    public final TestRule timeout = new DeadlockDetectingTimeout(60L, TimeUnit.SECONDS);

    public CuratorDruidCoordinatorTest() {
        this.jsonMapper = TestHelper.makeJsonMapper();
        this.zkPathsConfig = new ZkPathsConfig();
    }

    @Before
    public void setUp() throws Exception {
        this.dataSourcesSnapshot = (DataSourcesSnapshot)EasyMock.createNiceMock(DataSourcesSnapshot.class);
        this.coordinatorRuntimeParams = (DruidCoordinatorRuntimeParams)EasyMock.createNiceMock(DruidCoordinatorRuntimeParams.class);
        this.setupServerAndCurator();
        this.curator.start();
        this.curator.blockUntilConnected();
        this.curator.create().creatingParentsIfNeeded().forPath(SEGPATH);
        this.curator.create().creatingParentsIfNeeded().forPath(SOURCE_LOAD_PATH);
        this.curator.create().creatingParentsIfNeeded().forPath(DESTINATION_LOAD_PATH);
        DefaultObjectMapper objectMapper = new DefaultObjectMapper();
        TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder().withCoordinatorStartDelay(new Duration(1L)).withCoordinatorPeriod(new Duration(100L)).withCoordinatorKillPeriod(new Duration(100L)).withCoordinatorKillMaxSegments(10).withCoordinatorKillIgnoreDurationToRetain(false).build();
        this.sourceLoadQueueChildrenCache = new PathChildrenCache(this.curator, SOURCE_LOAD_PATH, true, true, Execs.singleThreaded((String)"coordinator_test_path_children_cache_src-%d"));
        this.destinationLoadQueueChildrenCache = new PathChildrenCache(this.curator, DESTINATION_LOAD_PATH, true, true, Execs.singleThreaded((String)"coordinator_test_path_children_cache_dest-%d"));
        this.sourceLoadQueuePeon = new CuratorLoadQueuePeon(this.curator, SOURCE_LOAD_PATH, (ObjectMapper)objectMapper, this.peonExec, this.callbackExec, (DruidCoordinatorConfig)druidCoordinatorConfig);
        this.destinationLoadQueuePeon = new CuratorLoadQueuePeon(this.curator, DESTINATION_LOAD_PATH, (ObjectMapper)objectMapper, this.peonExec, this.callbackExec, (DruidCoordinatorConfig)druidCoordinatorConfig);
    }

    @After
    public void tearDown() throws Exception {
        this.baseView.stop();
        this.sourceLoadQueuePeon.stop();
        this.sourceLoadQueueChildrenCache.close();
        this.destinationLoadQueueChildrenCache.close();
        this.tearDownServerAndCurator();
    }

    @Test
    public void testStopDoesntKillPoolItDoesntOwn() throws Exception {
        this.setupView();
        this.sourceLoadQueuePeon.stop();
        Assert.assertFalse((boolean)this.peonExec.isShutdown());
        Assert.assertFalse((boolean)this.callbackExec.isShutdown());
    }

    @Test
    public void testMoveSegment() throws Exception {
        this.segmentViewInitLatch = new CountDownLatch(1);
        this.segmentAddedLatch = new CountDownLatch(4);
        this.segmentRemovedLatch = new CountDownLatch(0);
        this.setupView();
        DruidServer source = new DruidServer("localhost:1", "localhost:1", null, 10000000L, ServerType.HISTORICAL, "default_tier", 0);
        DruidServer dest = new DruidServer("localhost:2", "localhost:2", null, 10000000L, ServerType.HISTORICAL, "default_tier", 0);
        this.setupZNodeForServer(source, this.zkPathsConfig, this.jsonMapper);
        this.setupZNodeForServer(dest, this.zkPathsConfig, this.jsonMapper);
        List<DataSegment> sourceSegments = Arrays.asList(this.createSegment("2011-04-01/2011-04-03", "v1"), this.createSegment("2011-04-03/2011-04-06", "v1"), this.createSegment("2011-04-06/2011-04-09", "v1"));
        List<DataSegment> destinationSegments = Collections.singletonList(this.createSegment("2011-03-31/2011-04-01", "v1"));
        DataSegment segmentToMove = sourceSegments.get(2);
        ArrayList<String> sourceSegKeys = new ArrayList<String>();
        for (DataSegment segment : sourceSegments) {
            sourceSegKeys.add(this.announceBatchSegmentsForServer(source, (ImmutableSet<DataSegment>)ImmutableSet.of((Object)segment), this.zkPathsConfig, this.jsonMapper));
        }
        for (DataSegment segment : destinationSegments) {
            this.announceBatchSegmentsForServer(dest, (ImmutableSet<DataSegment>)ImmutableSet.of((Object)segment), this.zkPathsConfig, this.jsonMapper);
        }
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentViewInitLatch));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentAddedLatch));
        CountDownLatch srcCountdown = new CountDownLatch(1);
        this.sourceLoadQueueChildrenCache.getListenable().addListener((curatorFramework, event) -> {
            if (event.getType().equals((Object)PathChildrenCacheEvent.Type.INITIALIZED)) {
                srcCountdown.countDown();
            } else if (CuratorUtils.isChildAdded((PathChildrenCacheEvent)event)) {
                this.unannounceSegmentFromBatchForServer(source, segmentToMove, (String)sourceSegKeys.get(2), this.zkPathsConfig);
            }
        });
        CountDownLatch destCountdown = new CountDownLatch(1);
        this.destinationLoadQueueChildrenCache.getListenable().addListener((curatorFramework, event) -> {
            if (event.getType().equals((Object)PathChildrenCacheEvent.Type.INITIALIZED)) {
                destCountdown.countDown();
            } else if (CuratorUtils.isChildAdded((PathChildrenCacheEvent)event)) {
                this.announceBatchSegmentsForServer(dest, (ImmutableSet<DataSegment>)ImmutableSet.of((Object)segmentToMove), this.zkPathsConfig, this.jsonMapper);
            }
        });
        this.sourceLoadQueueChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        this.destinationLoadQueueChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(srcCountdown));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(destCountdown));
        sourceSegments.forEach(arg_0 -> ((DruidServer)source).addDataSegment(arg_0));
        destinationSegments.forEach(arg_0 -> ((DruidServer)dest).addDataSegment(arg_0));
        this.segmentRemovedLatch = new CountDownLatch(1);
        this.segmentAddedLatch = new CountDownLatch(1);
        ImmutableDruidDataSource druidDataSource = (ImmutableDruidDataSource)EasyMock.createNiceMock(ImmutableDruidDataSource.class);
        EasyMock.expect((Object)druidDataSource.getSegment((SegmentId)EasyMock.anyObject(SegmentId.class))).andReturn((Object)sourceSegments.get(2));
        EasyMock.replay((Object[])new Object[]{druidDataSource});
        EasyMock.expect((Object)this.coordinatorRuntimeParams.getDataSourcesSnapshot()).andReturn((Object)this.dataSourcesSnapshot).anyTimes();
        CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig.builder().withUseRoundRobinSegmentAssignment(false).build();
        EasyMock.expect((Object)this.coordinatorRuntimeParams.getCoordinatorDynamicConfig()).andReturn((Object)dynamicConfig).anyTimes();
        EasyMock.expect((Object)this.coordinatorRuntimeParams.getSegmentLoadingConfig()).andReturn((Object)SegmentLoadingConfig.create((CoordinatorDynamicConfig)dynamicConfig, (int)100)).anyTimes();
        ServerHolder sourceServer = new ServerHolder(source.toImmutableDruidServer(), this.sourceLoadQueuePeon);
        ServerHolder destinationServer = new ServerHolder(dest.toImmutableDruidServer(), this.destinationLoadQueuePeon);
        DruidCluster cluster = DruidCluster.builder().add(sourceServer).add(destinationServer).build();
        BalancerStrategy balancerStrategy = (BalancerStrategy)EasyMock.mock(BalancerStrategy.class);
        EasyMock.expect((Object)balancerStrategy.findDestinationServerToMoveSegment((DataSegment)EasyMock.anyObject(), (ServerHolder)EasyMock.anyObject(), (List)EasyMock.anyObject())).andReturn((Object)destinationServer).atLeastOnce();
        EasyMock.expect((Object)this.coordinatorRuntimeParams.getBalancerStrategy()).andReturn((Object)balancerStrategy).anyTimes();
        EasyMock.expect((Object)this.coordinatorRuntimeParams.getDruidCluster()).andReturn((Object)cluster).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.coordinatorRuntimeParams, balancerStrategy});
        EasyMock.expect((Object)this.dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn((Object)druidDataSource).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.dataSourcesSnapshot});
        LoadQueueTaskMaster taskMaster = (LoadQueueTaskMaster)EasyMock.createMock(LoadQueueTaskMaster.class);
        EasyMock.expect((Object)taskMaster.isHttpLoading()).andReturn((Object)false).anyTimes();
        EasyMock.replay((Object[])new Object[]{taskMaster});
        SegmentLoadQueueManager loadQueueManager = new SegmentLoadQueueManager((ServerInventoryView)this.baseView, taskMaster);
        StrategicSegmentAssigner segmentAssigner = this.createSegmentAssigner(loadQueueManager, this.coordinatorRuntimeParams);
        segmentAssigner.moveSegment(segmentToMove, sourceServer, Collections.singletonList(destinationServer));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentAddedLatch));
        ((ChildrenDeletable)this.curator.delete().guaranteed()).forPath(ZKPaths.makePath((String)DESTINATION_LOAD_PATH, (String)segmentToMove.getId().toString()));
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(this.segmentRemovedLatch));
        ((ChildrenDeletable)this.curator.delete().guaranteed()).forPath(ZKPaths.makePath((String)SOURCE_LOAD_PATH, (String)segmentToMove.getId().toString()));
        ArrayList servers = new ArrayList(this.serverView.getInventory());
        Assert.assertEquals((long)2L, (long)((DruidServer)servers.get(0)).getTotalSegments());
        Assert.assertEquals((long)2L, (long)((DruidServer)servers.get(1)).getTotalSegments());
    }

    private void setupView() throws Exception {
        this.baseView = new BatchServerInventoryView(this.zkPathsConfig, this.curator, this.jsonMapper, Predicates.alwaysTrue(), "test"){

            public void registerSegmentCallback(Executor exec, final ServerView.SegmentCallback callback) {
                super.registerSegmentCallback(exec, new ServerView.SegmentCallback(){

                    public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) {
                        ServerView.CallbackAction res = callback.segmentAdded(server, segment);
                        CuratorDruidCoordinatorTest.this.segmentAddedLatch.countDown();
                        return res;
                    }

                    public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) {
                        ServerView.CallbackAction res = callback.segmentRemoved(server, segment);
                        CuratorDruidCoordinatorTest.this.segmentRemovedLatch.countDown();
                        return res;
                    }

                    public ServerView.CallbackAction segmentViewInitialized() {
                        ServerView.CallbackAction res = callback.segmentViewInitialized();
                        CuratorDruidCoordinatorTest.this.segmentViewInitLatch.countDown();
                        return res;
                    }

                    public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) {
                        return ServerView.CallbackAction.CONTINUE;
                    }
                });
            }
        };
        this.serverView = new CoordinatorServerView((ServerInventoryView)this.baseView, new CoordinatorSegmentWatcherConfig(), (ServiceEmitter)new NoopServiceEmitter(), null);
        this.baseView.start();
        this.sourceLoadQueuePeon.start();
        this.destinationLoadQueuePeon.start();
    }

    private DataSegment createSegment(String intervalStr, String version) {
        return DataSegment.builder().dataSource("test_curator_druid_coordinator").interval(Intervals.of((String)intervalStr)).loadSpec((Map)ImmutableMap.of((Object)"type", (Object)"local", (Object)"path", (Object)"somewhere")).version(version).dimensions((List)ImmutableList.of()).metrics((List)ImmutableList.of()).shardSpec((ShardSpec)NoneShardSpec.instance()).binaryVersion(Integer.valueOf(9)).size(0L).build();
    }

    private StrategicSegmentAssigner createSegmentAssigner(SegmentLoadQueueManager loadQueueManager, DruidCoordinatorRuntimeParams params) {
        return new StrategicSegmentAssigner(loadQueueManager, params.getDruidCluster(), params.getBalancerStrategy(), params.getSegmentLoadingConfig(), new CoordinatorRunStats());
    }
}

