/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordination.coordination;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CompressionProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
import org.apache.druid.curator.announcement.NodeAnnouncer;
import org.apache.druid.curator.announcement.ServiceAnnouncer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.SegmentSchemasChangeRequest;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BatchDataSegmentAnnouncerTest {
    private static final String TEST_BASE_PATH = "/test";
    private static final String TEST_SEGMENTS_PATH = "/test/segments/id";
    private static final Joiner JOINER = Joiner.on((String)"/");
    private static final int NUM_THREADS = 4;
    private TestingCluster testingCluster;
    private CuratorFramework cf;
    private ObjectMapper jsonMapper;
    private TestAnnouncer announcer;
    private SegmentReader segmentReader;
    private BatchDataSegmentAnnouncer segmentAnnouncer;
    private Set<DataSegment> testSegments;
    private final AtomicInteger maxBytesPerNode = new AtomicInteger(524288);
    private Boolean skipDimensionsAndMetrics;
    private Boolean skipLoadSpec;
    private ExecutorService exec;

    @Before
    public void setUp() throws Exception {
        this.testingCluster = new TestingCluster(1);
        this.testingCluster.start();
        this.cf = CuratorFrameworkFactory.builder().connectString(this.testingCluster.getConnectString()).retryPolicy((RetryPolicy)new ExponentialBackoffRetry(1, 10)).compressionProvider((CompressionProvider)new PotentiallyGzippedCompressionProvider(false)).build();
        this.cf.start();
        this.cf.blockUntilConnected();
        this.cf.create().creatingParentsIfNeeded().forPath(TEST_BASE_PATH);
        this.jsonMapper = TestHelper.makeJsonMapper();
        this.announcer = new TestAnnouncer(this.cf, (ExecutorService)Execs.directExecutor());
        this.announcer.start();
        this.segmentReader = new SegmentReader(this.cf, this.jsonMapper);
        this.skipDimensionsAndMetrics = false;
        this.skipLoadSpec = false;
        this.segmentAnnouncer = new BatchDataSegmentAnnouncer(new DruidServerMetadata("id", "host", null, Long.MAX_VALUE, ServerType.HISTORICAL, "tier", 0), new BatchDataSegmentAnnouncerConfig(){

            public int getSegmentsPerNode() {
                return 50;
            }

            public long getMaxBytesPerNode() {
                return BatchDataSegmentAnnouncerTest.this.maxBytesPerNode.get();
            }

            public boolean isSkipDimensionsAndMetrics() {
                return BatchDataSegmentAnnouncerTest.this.skipDimensionsAndMetrics;
            }

            public boolean isSkipLoadSpec() {
                return BatchDataSegmentAnnouncerTest.this.skipLoadSpec;
            }
        }, new ZkPathsConfig(){

            public String getBase() {
                return BatchDataSegmentAnnouncerTest.TEST_BASE_PATH;
            }
        }, (ServiceAnnouncer)this.announcer, this.jsonMapper);
        this.testSegments = new HashSet<DataSegment>();
        for (int i = 0; i < 100; ++i) {
            this.testSegments.add(BatchDataSegmentAnnouncerTest.makeSegment(i));
        }
        this.exec = Execs.multiThreaded((int)4, (String)"BatchDataSegmentAnnouncerTest-%d");
    }

    @After
    public void tearDown() throws Exception {
        this.announcer.stop();
        this.cf.close();
        this.testingCluster.stop();
        this.exec.shutdownNow();
    }

    @Test
    public void testSingleAnnounce() throws Exception {
        Set<DataSegment> segments;
        Iterator<DataSegment> segIter = this.testSegments.iterator();
        DataSegment firstSegment = segIter.next();
        DataSegment secondSegment = segIter.next();
        this.segmentAnnouncer.announceSegment(firstSegment);
        List zNodes = (List)this.cf.getChildren().forPath(TEST_SEGMENTS_PATH);
        for (String zNode : zNodes) {
            segments = this.segmentReader.read(JOINER.join((Object)TEST_SEGMENTS_PATH, (Object)zNode, new Object[0]));
            Assert.assertEquals((Object)segments.iterator().next(), (Object)firstSegment);
        }
        this.segmentAnnouncer.announceSegment(secondSegment);
        for (String zNode : zNodes) {
            segments = this.segmentReader.read(JOINER.join((Object)TEST_SEGMENTS_PATH, (Object)zNode, new Object[0]));
            Assert.assertEquals((Object)Sets.newHashSet((Object[])new DataSegment[]{firstSegment, secondSegment}), segments);
        }
        ChangeRequestsSnapshot snapshot = (ChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(new ChangeRequestHistory.Counter(-1L, -1L)).get();
        Assert.assertEquals((long)2L, (long)snapshot.getRequests().size());
        Assert.assertEquals((long)2L, (long)snapshot.getCounter().getCounter());
        this.segmentAnnouncer.unannounceSegment(firstSegment);
        for (String zNode : zNodes) {
            Set<DataSegment> segments2 = this.segmentReader.read(JOINER.join((Object)TEST_SEGMENTS_PATH, (Object)zNode, new Object[0]));
            Assert.assertEquals((Object)segments2.iterator().next(), (Object)secondSegment);
        }
        this.segmentAnnouncer.unannounceSegment(secondSegment);
        Assert.assertTrue((boolean)((List)this.cf.getChildren().forPath(TEST_SEGMENTS_PATH)).isEmpty());
        snapshot = (ChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(snapshot.getCounter()).get();
        Assert.assertEquals((long)2L, (long)snapshot.getRequests().size());
        Assert.assertEquals((long)4L, (long)snapshot.getCounter().getCounter());
        snapshot = (ChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(new ChangeRequestHistory.Counter(-1L, -1L)).get();
        Assert.assertEquals((long)0L, (long)snapshot.getRequests().size());
        Assert.assertEquals((long)4L, (long)snapshot.getCounter().getCounter());
    }

    @Test
    public void testSingleTombstoneAnnounce() throws Exception {
        DataSegment firstSegment = BatchDataSegmentAnnouncerTest.makeSegment(0, true);
        this.segmentAnnouncer.announceSegment(firstSegment);
        List zNodes = (List)this.cf.getChildren().forPath(TEST_SEGMENTS_PATH);
        for (String zNode : zNodes) {
            Set<DataSegment> segments = this.segmentReader.read(JOINER.join((Object)TEST_SEGMENTS_PATH, (Object)zNode, new Object[0]));
            Assert.assertEquals((Object)segments.iterator().next(), (Object)firstSegment);
        }
        ChangeRequestsSnapshot snapshot = (ChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(new ChangeRequestHistory.Counter(-1L, -1L)).get();
        Assert.assertEquals((long)1L, (long)snapshot.getRequests().size());
        Assert.assertEquals((long)1L, (long)snapshot.getCounter().getCounter());
        this.segmentAnnouncer.unannounceSegment(firstSegment);
        Assert.assertTrue((boolean)((List)this.cf.getChildren().forPath(TEST_SEGMENTS_PATH)).isEmpty());
        snapshot = (ChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(snapshot.getCounter()).get();
        Assert.assertEquals((long)1L, (long)snapshot.getRequests().size());
        Assert.assertEquals((long)2L, (long)snapshot.getCounter().getCounter());
        snapshot = (ChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(new ChangeRequestHistory.Counter(-1L, -1L)).get();
        Assert.assertEquals((long)0L, (long)snapshot.getRequests().size());
        Assert.assertEquals((long)2L, (long)snapshot.getCounter().getCounter());
    }

    @Test
    public void testSkipDimensions() throws Exception {
        this.skipDimensionsAndMetrics = true;
        Iterator<DataSegment> segIter = this.testSegments.iterator();
        DataSegment firstSegment = segIter.next();
        this.segmentAnnouncer.announceSegment(firstSegment);
        List zNodes = (List)this.cf.getChildren().forPath(TEST_SEGMENTS_PATH);
        for (String zNode : zNodes) {
            DataSegment announcedSegment = (DataSegment)Iterables.getOnlyElement(this.segmentReader.read(JOINER.join((Object)TEST_SEGMENTS_PATH, (Object)zNode, new Object[0])));
            Assert.assertEquals((Object)announcedSegment, (Object)firstSegment);
            Assert.assertTrue((boolean)announcedSegment.getDimensions().isEmpty());
            Assert.assertTrue((boolean)announcedSegment.getMetrics().isEmpty());
        }
        this.segmentAnnouncer.unannounceSegment(firstSegment);
        Assert.assertTrue((boolean)((List)this.cf.getChildren().forPath(TEST_SEGMENTS_PATH)).isEmpty());
    }

    @Test
    public void testSkipLoadSpec() throws Exception {
        this.skipLoadSpec = true;
        Iterator<DataSegment> segIter = this.testSegments.iterator();
        DataSegment firstSegment = segIter.next();
        this.segmentAnnouncer.announceSegment(firstSegment);
        List zNodes = (List)this.cf.getChildren().forPath(TEST_SEGMENTS_PATH);
        for (String zNode : zNodes) {
            DataSegment announcedSegment = (DataSegment)Iterables.getOnlyElement(this.segmentReader.read(JOINER.join((Object)TEST_SEGMENTS_PATH, (Object)zNode, new Object[0])));
            Assert.assertEquals((Object)announcedSegment, (Object)firstSegment);
            Assert.assertNull((Object)announcedSegment.getLoadSpec());
        }
        this.segmentAnnouncer.unannounceSegment(firstSegment);
        Assert.assertTrue((boolean)((List)this.cf.getChildren().forPath(TEST_SEGMENTS_PATH)).isEmpty());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSingleAnnounceManyTimes() throws Exception {
        int prevMax = this.maxBytesPerNode.get();
        this.maxBytesPerNode.set(2048);
        try {
            for (DataSegment segment : this.testSegments) {
                this.segmentAnnouncer.announceSegment(segment);
            }
        }
        finally {
            this.maxBytesPerNode.set(prevMax);
        }
        List zNodes = (List)this.cf.getChildren().forPath(TEST_SEGMENTS_PATH);
        Assert.assertEquals((long)20L, (long)zNodes.size());
        HashSet segments = Sets.newHashSet(this.testSegments);
        for (String zNode : zNodes) {
            for (DataSegment segment : this.segmentReader.read(JOINER.join((Object)TEST_SEGMENTS_PATH, (Object)zNode, new Object[0]))) {
                Assert.assertTrue((String)("Invalid segment " + segment), (boolean)segments.remove(segment));
            }
        }
        Assert.assertTrue((String)("Failed to find segments " + segments), (boolean)segments.isEmpty());
    }

    @Test
    public void testBatchAnnounce() throws Exception {
        this.testBatchAnnounce(true);
    }

    @Test
    public void testMultipleBatchAnnounce() throws Exception {
        for (int i = 0; i < 10; ++i) {
            this.testBatchAnnounce(false);
        }
    }

    @Test
    public void testSchemaAnnounce() throws Exception {
        String dataSource = "foo";
        String segmentId = "id";
        String taskId = "t1";
        SegmentSchemas.SegmentSchema absoluteSchema1 = new SegmentSchemas.SegmentSchema(dataSource, segmentId, false, Integer.valueOf(20), (List)ImmutableList.of((Object)"dim1", (Object)"dim2"), Collections.emptyList(), (Map)ImmutableMap.of((Object)"dim1", (Object)ColumnType.STRING, (Object)"dim2", (Object)ColumnType.STRING));
        SegmentSchemas.SegmentSchema absoluteSchema2 = new SegmentSchemas.SegmentSchema(dataSource, segmentId, false, Integer.valueOf(40), (List)ImmutableList.of((Object)"dim1", (Object)"dim2", (Object)"dim3"), (List)ImmutableList.of(), (Map)ImmutableMap.of((Object)"dim1", (Object)ColumnType.UNKNOWN_COMPLEX, (Object)"dim2", (Object)ColumnType.STRING, (Object)"dim3", (Object)ColumnType.STRING));
        SegmentSchemas.SegmentSchema deltaSchema = new SegmentSchemas.SegmentSchema(dataSource, segmentId, true, Integer.valueOf(40), (List)ImmutableList.of((Object)"dim3"), (List)ImmutableList.of((Object)"dim1"), (Map)ImmutableMap.of((Object)"dim1", (Object)ColumnType.UNKNOWN_COMPLEX, (Object)"dim3", (Object)ColumnType.STRING));
        this.segmentAnnouncer.announceSegmentSchemas(taskId, new SegmentSchemas(Collections.singletonList(absoluteSchema1)), new SegmentSchemas(Collections.singletonList(absoluteSchema1)));
        ChangeRequestsSnapshot snapshot = (ChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(new ChangeRequestHistory.Counter(-1L, -1L)).get();
        Assert.assertEquals((long)1L, (long)snapshot.getRequests().size());
        Assert.assertEquals((long)1L, (long)snapshot.getCounter().getCounter());
        Assert.assertEquals((Object)absoluteSchema1, ((SegmentSchemasChangeRequest)snapshot.getRequests().get(0)).getSegmentSchemas().getSegmentSchemaList().get(0));
        this.segmentAnnouncer.announceSegmentSchemas(taskId, new SegmentSchemas(Collections.singletonList(absoluteSchema2)), new SegmentSchemas(Collections.singletonList(deltaSchema)));
        snapshot = (ChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(snapshot.getCounter()).get();
        Assert.assertEquals((Object)deltaSchema, ((SegmentSchemasChangeRequest)snapshot.getRequests().get(0)).getSegmentSchemas().getSegmentSchemaList().get(0));
        Assert.assertEquals((long)1L, (long)snapshot.getRequests().size());
        Assert.assertEquals((long)2L, (long)snapshot.getCounter().getCounter());
        snapshot = (ChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(new ChangeRequestHistory.Counter(-1L, -1L)).get();
        Assert.assertEquals((Object)absoluteSchema2, ((SegmentSchemasChangeRequest)snapshot.getRequests().get(0)).getSegmentSchemas().getSegmentSchemaList().get(0));
        Assert.assertEquals((long)1L, (long)snapshot.getRequests().size());
        Assert.assertEquals((long)2L, (long)snapshot.getCounter().getCounter());
    }

    private void testBatchAnnounce(boolean testHistory) throws Exception {
        this.segmentAnnouncer.announceSegments(this.testSegments);
        List zNodes = (List)this.cf.getChildren().forPath(TEST_SEGMENTS_PATH);
        Assert.assertEquals((long)2L, (long)zNodes.size());
        HashSet<DataSegment> allSegments = new HashSet<DataSegment>();
        for (String zNode : zNodes) {
            allSegments.addAll(this.segmentReader.read(JOINER.join((Object)TEST_SEGMENTS_PATH, (Object)zNode, new Object[0])));
        }
        Assert.assertEquals(allSegments, this.testSegments);
        ChangeRequestsSnapshot snapshot = null;
        if (testHistory) {
            snapshot = (ChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(new ChangeRequestHistory.Counter(-1L, -1L)).get();
            Assert.assertEquals((long)this.testSegments.size(), (long)snapshot.getRequests().size());
            Assert.assertEquals((long)this.testSegments.size(), (long)snapshot.getCounter().getCounter());
        }
        this.segmentAnnouncer.unannounceSegments(this.testSegments);
        Assert.assertTrue((boolean)((List)this.cf.getChildren().forPath(TEST_SEGMENTS_PATH)).isEmpty());
        if (testHistory) {
            snapshot = (ChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(snapshot.getCounter()).get();
            Assert.assertEquals((long)this.testSegments.size(), (long)snapshot.getRequests().size());
            Assert.assertEquals((long)(2 * this.testSegments.size()), (long)snapshot.getCounter().getCounter());
            snapshot = (ChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(new ChangeRequestHistory.Counter(-1L, -1L)).get();
            Assert.assertEquals((long)0L, (long)snapshot.getRequests().size());
            Assert.assertEquals((long)(2 * this.testSegments.size()), (long)snapshot.getCounter().getCounter());
        }
    }

    @Test(timeout=5000L)
    public void testAnnounceSegmentsWithSameSegmentConcurrently() throws ExecutionException, InterruptedException {
        ArrayList futures = new ArrayList(4);
        for (int i = 0; i < 4; ++i) {
            futures.add(this.exec.submit(() -> {
                try {
                    this.segmentAnnouncer.announceSegments(this.testSegments);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }));
        }
        for (Future future : futures) {
            future.get();
        }
        Assert.assertEquals((long)2L, (long)this.announcer.numPathAnnounced.size());
        for (ConcurrentHashMap concurrentHashMap : this.announcer.numPathAnnounced.values()) {
            for (Map.Entry entry : concurrentHashMap.entrySet()) {
                Assert.assertEquals((long)1L, (long)((AtomicInteger)entry.getValue()).get());
            }
        }
    }

    @Test(timeout=5000L)
    public void testAnnounceSegmentWithSameSegmentConcurrently() throws ExecutionException, InterruptedException {
        ArrayList futures = new ArrayList(4);
        DataSegment segment1 = BatchDataSegmentAnnouncerTest.makeSegment(0);
        DataSegment segment2 = BatchDataSegmentAnnouncerTest.makeSegment(1);
        DataSegment segment3 = BatchDataSegmentAnnouncerTest.makeSegment(2);
        DataSegment segment4 = BatchDataSegmentAnnouncerTest.makeSegment(3);
        for (int i = 0; i < 4; ++i) {
            futures.add(this.exec.submit(() -> {
                try {
                    this.segmentAnnouncer.announceSegment(segment1);
                    this.segmentAnnouncer.announceSegment(segment2);
                    this.segmentAnnouncer.announceSegment(segment3);
                    this.segmentAnnouncer.announceSegment(segment4);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }));
        }
        for (Future future : futures) {
            future.get();
        }
        Assert.assertEquals((long)1L, (long)this.announcer.numPathAnnounced.size());
        for (ConcurrentHashMap concurrentHashMap : this.announcer.numPathAnnounced.values()) {
            for (Map.Entry entry : concurrentHashMap.entrySet()) {
                Assert.assertEquals((long)1L, (long)((AtomicInteger)entry.getValue()).get());
            }
        }
    }

    private static DataSegment makeSegment(int offset, boolean isTombstone) {
        Interval interval = new Interval((ReadableInstant)DateTimes.of((String)"2013-01-01").plusDays(offset), (ReadableInstant)DateTimes.of((String)"2013-01-02").plusDays(offset));
        SegmentId segmentId = SegmentId.of((String)"foo", (Interval)interval, (String)DateTimes.nowUtc().toString(), null);
        DataSegment.Builder builder = DataSegment.builder((SegmentId)segmentId).loadSpec((Map)ImmutableMap.of((Object)"type", (Object)"local")).dimensions((List)ImmutableList.of((Object)"dim1", (Object)"dim2")).metrics((List)ImmutableList.of((Object)"met1", (Object)"met2")).projections((List)ImmutableList.of((Object)"proj1", (Object)"proj2")).size(0L);
        if (isTombstone) {
            builder.loadSpec(Collections.singletonMap("type", "tombstone"));
        }
        return builder.build();
    }

    private static DataSegment makeSegment(int offset) {
        return BatchDataSegmentAnnouncerTest.makeSegment(offset, false);
    }

    private static class TestAnnouncer
    extends NodeAnnouncer {
        private final ConcurrentHashMap<String, ConcurrentHashMap<byte[], AtomicInteger>> numPathAnnounced = new ConcurrentHashMap();

        private TestAnnouncer(CuratorFramework curator, ExecutorService exec) {
            super(curator, exec);
        }

        public void announce(String path, byte[] bytes, boolean removeParentIfCreated) {
            this.numPathAnnounced.computeIfAbsent(path, k -> new ConcurrentHashMap()).computeIfAbsent(bytes, k -> new AtomicInteger(0)).incrementAndGet();
            super.announce(path, bytes, removeParentIfCreated);
        }
    }

    private static class SegmentReader {
        private final CuratorFramework cf;
        private final ObjectMapper jsonMapper;

        public SegmentReader(CuratorFramework cf, ObjectMapper jsonMapper) {
            this.cf = cf;
            this.jsonMapper = jsonMapper;
        }

        public Set<DataSegment> read(String path) {
            try {
                if (this.cf.checkExists().forPath(path) != null) {
                    return (Set)this.jsonMapper.readValue((byte[])this.cf.getData().forPath(path), (TypeReference)new TypeReference<Set<DataSegment>>(){});
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return new HashSet<DataSegment>();
        }
    }
}

