/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorTester;
import org.apache.druid.segment.realtime.appenderator.TestUsedSegmentChecker;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamAppenderatorDriverTest
extends EasyMockSupport {
    private static final String DATA_SOURCE = "foo";
    private static final String VERSION = "abc123";
    private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
    private static final int MAX_ROWS_IN_MEMORY = 100;
    private static final int MAX_ROWS_PER_SEGMENT = 3;
    private static final long PUBLISH_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10L);
    private static final long HANDOFF_CONDITION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1L);
    private static final List<InputRow> ROWS = Arrays.asList(new MapBasedInputRow(DateTimes.of((String)"2000"), (List)ImmutableList.of((Object)"dim1"), (Map)ImmutableMap.of((Object)"dim1", (Object)"foo", (Object)"met1", (Object)"1")), new MapBasedInputRow(DateTimes.of((String)"2000T01"), (List)ImmutableList.of((Object)"dim1"), (Map)ImmutableMap.of((Object)"dim1", (Object)"foo", (Object)"met1", (Object)2.0)), new MapBasedInputRow(DateTimes.of((String)"2000T01"), (List)ImmutableList.of((Object)"dim2"), (Map)ImmutableMap.of((Object)"dim2", (Object)"bar", (Object)"met1", (Object)2.0)));
    private SegmentAllocator allocator;
    private StreamAppenderatorTester streamAppenderatorTester;
    private TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory;
    private StreamAppenderatorDriver driver;
    private DataSegmentKiller dataSegmentKiller;

    @Before
    public void setUp() {
        this.streamAppenderatorTester = new StreamAppenderatorTester(100);
        this.allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
        this.segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
        this.dataSegmentKiller = (DataSegmentKiller)this.createStrictMock(DataSegmentKiller.class);
        this.driver = new StreamAppenderatorDriver(this.streamAppenderatorTester.getAppenderator(), this.allocator, (SegmentHandoffNotifierFactory)this.segmentHandoffNotifierFactory, (UsedSegmentChecker)new TestUsedSegmentChecker(this.streamAppenderatorTester.getPushedSegments()), this.dataSegmentKiller, OBJECT_MAPPER, new FireDepartmentMetrics());
        EasyMock.replay((Object[])new Object[]{this.dataSegmentKiller});
    }

    @After
    public void tearDown() throws Exception {
        EasyMock.verify((Object[])new Object[]{this.dataSegmentKiller});
        this.driver.clear();
        this.driver.close();
    }

    @Test(timeout=60000L)
    public void testSimple() throws Exception {
        TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<Integer>();
        Assert.assertNull((Object)this.driver.startJob(null));
        for (int i = 0; i < ROWS.size(); ++i) {
            committerSupplier.setMetadata(i + 1);
            Assert.assertTrue((boolean)this.driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk());
        }
        SegmentsAndCommitMetadata published = (SegmentsAndCommitMetadata)this.driver.publish(StreamAppenderatorDriverTest.makeOkPublisher(), committerSupplier.get(), (Collection)ImmutableList.of((Object)"dummy")).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        while (this.driver.getSegments().containsKey("dummy")) {
            Thread.sleep(100L);
        }
        SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata)this.driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals((Object)ImmutableSet.of((Object)new SegmentIdWithShardSpec(DATA_SOURCE, Intervals.of((String)"2000/PT1H"), VERSION, (ShardSpec)new NumberedShardSpec(0, 0)), (Object)new SegmentIdWithShardSpec(DATA_SOURCE, Intervals.of((String)"2000T01/PT1H"), VERSION, (ShardSpec)new NumberedShardSpec(0, 0))), this.asIdentifiers(segmentsAndCommitMetadata.getSegments()));
        Assert.assertEquals((Object)3, (Object)segmentsAndCommitMetadata.getCommitMetadata());
    }

    @Test
    public void testMaxRowsPerSegment() throws Exception {
        int numSegments = 3;
        TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<Integer>();
        Assert.assertNull((Object)this.driver.startJob(null));
        for (int i = 0; i < 9; ++i) {
            committerSupplier.setMetadata(i + 1);
            MapBasedInputRow row = new MapBasedInputRow(DateTimes.of((String)"2000T01"), (List)ImmutableList.of((Object)"dim2"), (Map)ImmutableMap.of((Object)"dim2", (Object)StringUtils.format((String)"bar-%d", (Object[])new Object[]{i}), (Object)"met1", (Object)2.0));
            AppenderatorDriverAddResult addResult = this.driver.add((InputRow)row, "dummy", committerSupplier, false, true);
            Assert.assertTrue((boolean)addResult.isOk());
            if (addResult.getNumRowsInSegment() <= 3) continue;
            this.driver.moveSegmentOut("dummy", (List)ImmutableList.of((Object)addResult.getSegmentIdentifier()));
        }
        SegmentsAndCommitMetadata published = (SegmentsAndCommitMetadata)this.driver.publish(StreamAppenderatorDriverTest.makeOkPublisher(), committerSupplier.get(), (Collection)ImmutableList.of((Object)"dummy")).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        while (this.driver.getSegments().containsKey("dummy")) {
            Thread.sleep(100L);
        }
        SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata)this.driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals((long)3L, (long)segmentsAndCommitMetadata.getSegments().size());
        Assert.assertEquals((Object)9, (Object)segmentsAndCommitMetadata.getCommitMetadata());
    }

    @Test(timeout=60000L, expected=TimeoutException.class)
    public void testHandoffTimeout() throws Exception {
        TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<Integer>();
        this.segmentHandoffNotifierFactory.disableHandoff();
        Assert.assertNull((Object)this.driver.startJob(null));
        for (int i = 0; i < ROWS.size(); ++i) {
            committerSupplier.setMetadata(i + 1);
            Assert.assertTrue((boolean)this.driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk());
        }
        SegmentsAndCommitMetadata published = (SegmentsAndCommitMetadata)this.driver.publish(StreamAppenderatorDriverTest.makeOkPublisher(), committerSupplier.get(), (Collection)ImmutableList.of((Object)"dummy")).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        while (this.driver.getSegments().containsKey("dummy")) {
            Thread.sleep(100L);
        }
        this.driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testPublishPerRow() throws IOException, InterruptedException, TimeoutException, ExecutionException {
        TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<Integer>();
        Assert.assertNull((Object)this.driver.startJob(null));
        committerSupplier.setMetadata(1);
        Assert.assertTrue((boolean)this.driver.add(ROWS.get(0), "dummy", committerSupplier, false, true).isOk());
        SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata)this.driver.publishAndRegisterHandoff(StreamAppenderatorDriverTest.makeOkPublisher(), committerSupplier.get(), (Collection)ImmutableList.of((Object)"dummy")).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals((Object)ImmutableSet.of((Object)new SegmentIdWithShardSpec(DATA_SOURCE, Intervals.of((String)"2000/PT1H"), VERSION, (ShardSpec)new NumberedShardSpec(0, 0))), this.asIdentifiers(segmentsAndCommitMetadata.getSegments()));
        Assert.assertEquals((Object)1, (Object)segmentsAndCommitMetadata.getCommitMetadata());
        for (int i = 1; i < ROWS.size(); ++i) {
            committerSupplier.setMetadata(i + 1);
            Assert.assertTrue((boolean)this.driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk());
            SegmentsAndCommitMetadata segmentsAndCommitMetadata2 = (SegmentsAndCommitMetadata)this.driver.publishAndRegisterHandoff(StreamAppenderatorDriverTest.makeOkPublisher(), committerSupplier.get(), (Collection)ImmutableList.of((Object)"dummy")).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
            Assert.assertEquals((Object)ImmutableSet.of((Object)new SegmentIdWithShardSpec(DATA_SOURCE, Intervals.of((String)"2000T01/PT1H"), VERSION, (ShardSpec)new NumberedShardSpec(i - 1, 0))), this.asIdentifiers(segmentsAndCommitMetadata2.getSegments()));
            Assert.assertEquals((Object)(i + 1), (Object)segmentsAndCommitMetadata2.getCommitMetadata());
        }
        this.driver.persist(committerSupplier.get());
        segmentsAndCommitMetadata = (SegmentsAndCommitMetadata)this.driver.publishAndRegisterHandoff(StreamAppenderatorDriverTest.makeOkPublisher(), committerSupplier.get(), (Collection)ImmutableList.of((Object)"dummy")).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals((Object)ImmutableSet.of(), this.asIdentifiers(segmentsAndCommitMetadata.getSegments()));
        Assert.assertEquals((Object)3, (Object)segmentsAndCommitMetadata.getCommitMetadata());
    }

    @Test
    public void testIncrementalHandoff() throws Exception {
        TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<Integer>();
        Assert.assertNull((Object)this.driver.startJob(null));
        committerSupplier.setMetadata(1);
        Assert.assertTrue((boolean)this.driver.add(ROWS.get(0), "sequence_0", committerSupplier, false, true).isOk());
        for (int i = 1; i < ROWS.size(); ++i) {
            committerSupplier.setMetadata(i + 1);
            Assert.assertTrue((boolean)this.driver.add(ROWS.get(i), "sequence_1", committerSupplier, false, true).isOk());
        }
        ListenableFuture futureForSequence0 = this.driver.publishAndRegisterHandoff(StreamAppenderatorDriverTest.makeOkPublisher(), committerSupplier.get(), (Collection)ImmutableList.of((Object)"sequence_0"));
        ListenableFuture futureForSequence1 = this.driver.publishAndRegisterHandoff(StreamAppenderatorDriverTest.makeOkPublisher(), committerSupplier.get(), (Collection)ImmutableList.of((Object)"sequence_1"));
        SegmentsAndCommitMetadata handedoffFromSequence0 = (SegmentsAndCommitMetadata)futureForSequence0.get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        SegmentsAndCommitMetadata handedoffFromSequence1 = (SegmentsAndCommitMetadata)futureForSequence1.get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals((Object)ImmutableSet.of((Object)new SegmentIdWithShardSpec(DATA_SOURCE, Intervals.of((String)"2000/PT1H"), VERSION, (ShardSpec)new NumberedShardSpec(0, 0))), this.asIdentifiers(handedoffFromSequence0.getSegments()));
        Assert.assertEquals((Object)ImmutableSet.of((Object)new SegmentIdWithShardSpec(DATA_SOURCE, Intervals.of((String)"2000T01/PT1H"), VERSION, (ShardSpec)new NumberedShardSpec(0, 0))), this.asIdentifiers(handedoffFromSequence1.getSegments()));
        Assert.assertEquals((Object)3, (Object)handedoffFromSequence0.getCommitMetadata());
        Assert.assertEquals((Object)3, (Object)handedoffFromSequence1.getCommitMetadata());
    }

    private Set<SegmentIdWithShardSpec> asIdentifiers(Iterable<DataSegment> segments) {
        return ImmutableSet.copyOf((Iterable)Iterables.transform(segments, SegmentIdWithShardSpec::fromDataSegment));
    }

    static TransactionalSegmentPublisher makeOkPublisher() {
        return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(Collections.emptySet());
    }

    static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException) {
        return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, commitMetadata) -> {
            RuntimeException exception = new RuntimeException("test");
            if (failWithException) {
                throw exception;
            }
            return SegmentPublishResult.fail((String)exception.getMessage());
        };
    }

    static {
        NullHandling.initializeForTests();
    }

    static class TestSegmentHandoffNotifierFactory
    implements SegmentHandoffNotifierFactory {
        private boolean handoffEnabled = true;
        private long handoffDelay;

        TestSegmentHandoffNotifierFactory() {
        }

        public void disableHandoff() {
            this.handoffEnabled = false;
        }

        public void setHandoffDelay(long delay) {
            this.handoffDelay = delay;
        }

        public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) {
            return new SegmentHandoffNotifier(){

                public boolean registerSegmentHandoffCallback(SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable) {
                    if (handoffEnabled) {
                        if (handoffDelay > 0L) {
                            try {
                                Thread.sleep(handoffDelay);
                            }
                            catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                        exec.execute(handOffRunnable);
                    }
                    return true;
                }

                public void start() {
                }

                public void close() {
                }
            };
        }
    }

    static class TestSegmentAllocator
    implements SegmentAllocator {
        private final String dataSource;
        private final Granularity granularity;
        private final Map<Long, AtomicInteger> counters = new HashMap<Long, AtomicInteger>();

        public TestSegmentAllocator(String dataSource, Granularity granularity) {
            this.dataSource = dataSource;
            this.granularity = granularity;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SegmentIdWithShardSpec allocate(InputRow row, String sequenceName, String previousSegmentId, boolean skipSegmentLineageCheck) {
            Map<Long, AtomicInteger> map = this.counters;
            synchronized (map) {
                DateTime dateTimeTruncated = this.granularity.bucketStart(row.getTimestamp());
                long timestampTruncated = dateTimeTruncated.getMillis();
                this.counters.putIfAbsent(timestampTruncated, new AtomicInteger());
                int partitionNum = this.counters.get(timestampTruncated).getAndIncrement();
                return new SegmentIdWithShardSpec(this.dataSource, this.granularity.bucket(dateTimeTruncated), StreamAppenderatorDriverTest.VERSION, (ShardSpec)new NumberedShardSpec(partitionNum, 0));
            }
        }
    }

    static class TestCommitterSupplier<T>
    implements Supplier<Committer> {
        private final AtomicReference<T> metadata = new AtomicReference();

        TestCommitterSupplier() {
        }

        public void setMetadata(T newMetadata) {
            this.metadata.set(newMetadata);
        }

        public Committer get() {
            final T currentMetadata = this.metadata.get();
            return new Committer(){

                public Object getMetadata() {
                    return currentMetadata;
                }

                public void run() {
                }
            };
        }
    }
}

