/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.segment.realtime.appenderator;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.BatchAppenderator;
import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorTester;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.joda.time.Chronology;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableCauseMatcher;
import org.junit.internal.matchers.ThrowableMessageMatcher;

public class BatchAppenderatorTest
extends InitializedNullHandlingTest {
    private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of((Object)BatchAppenderatorTest.createSegmentId("2000/2001", "A", 0), (Object)BatchAppenderatorTest.createSegmentId("2000/2001", "A", 1), (Object)BatchAppenderatorTest.createSegmentId("2001/2002", "A", 0));

    @Test
    public void testSimpleIngestion() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(3, false);){
            Appenderator appenderator = tester.getAppenderator();
            Assert.assertNull((Object)appenderator.startJob());
            Assert.assertEquals((Object)"foo", (Object)appenderator.getDataSource());
            Assert.assertEquals((long)1L, (long)appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null).getNumRowsInSegment());
            Assert.assertEquals((long)1L, (long)appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar", 2), null).getNumRowsInSegment());
            Assert.assertEquals(IDENTIFIERS.subList(0, 2), appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals((long)2L, (long)appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "sux", 1), null).getNumRowsInSegment());
            Assert.assertEquals(Collections.emptyList(), ((BatchAppenderator)appenderator).getInMemorySegments().stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals((long)1L, (long)appenderator.add(IDENTIFIERS.get(2), BatchAppenderatorTest.createInputRow("2001", "qux", 4), null).getNumRowsInSegment());
            SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata)appenderator.push((Collection)appenderator.getSegments(), null, false).get();
            Assert.assertEquals(IDENTIFIERS.subList(0, 3), Lists.transform((List)segmentsAndCommitMetadata.getSegments(), SegmentIdWithShardSpec::fromDataSegment).stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()), segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
            appenderator.close();
            Assert.assertTrue((boolean)appenderator.getSegments().isEmpty());
        }
    }

    @Test
    public void testPushFailure() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(3, true);){
            Appenderator appenderator = tester.getAppenderator();
            Assert.assertNull((Object)appenderator.startJob());
            Assert.assertEquals((Object)"foo", (Object)appenderator.getDataSource());
            Assert.assertEquals((long)1L, (long)appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null).getNumRowsInSegment());
            Assert.assertEquals((long)1L, (long)appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar", 2), null).getNumRowsInSegment());
            Assert.assertEquals(IDENTIFIERS.subList(0, 2), appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals((long)2L, (long)appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "sux", 1), null).getNumRowsInSegment());
            Assert.assertEquals(Collections.emptyList(), ((BatchAppenderator)appenderator).getInMemorySegments().stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals((long)1L, (long)appenderator.add(IDENTIFIERS.get(2), BatchAppenderatorTest.createInputRow("2001", "qux", 4), null).getNumRowsInSegment());
            ListenableFuture segmentsAndCommitMetadata = appenderator.push((Collection)appenderator.getSegments(), null, false);
            ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> segmentsAndCommitMetadata.get());
            MatcherAssert.assertThat((Object)e, (Matcher)ThrowableCauseMatcher.hasCause((Matcher)ThrowableCauseMatcher.hasCause((Matcher)CoreMatchers.instanceOf(IOException.class))));
            MatcherAssert.assertThat((Object)e, (Matcher)ThrowableCauseMatcher.hasCause((Matcher)ThrowableCauseMatcher.hasCause((Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.startsWith((String)"Push failure test")))));
        }
    }

    @Test
    public void testPeriodGranularityNonUTCIngestion() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(1, false);){
            Appenderator appenderator = tester.getAppenderator();
            Assert.assertNull((Object)appenderator.startJob());
            Assert.assertEquals((Object)"foo", (Object)appenderator.getDataSource());
            SegmentIdWithShardSpec segmentIdWithNonUTCTime = BatchAppenderatorTest.createNonUTCSegmentId("2021-06-27T00:00:00.000+09:00/2021-06-28T00:00:00.000+09:00", "A", 0);
            Assert.assertEquals((long)1L, (long)appenderator.add(segmentIdWithNonUTCTime, BatchAppenderatorTest.createInputRow("2021-06-27T00:01:11.080Z", "foo", 1), null).getNumRowsInSegment());
            Assert.assertEquals(Collections.singletonList(segmentIdWithNonUTCTime), appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals(Collections.emptyList(), ((BatchAppenderator)appenderator).getInMemorySegments().stream().sorted().collect(Collectors.toList()));
            SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata)appenderator.push((Collection)appenderator.getSegments(), null, false).get();
            Assert.assertEquals(Collections.singletonList(segmentIdWithNonUTCTime), Lists.transform((List)segmentsAndCommitMetadata.getSegments(), SegmentIdWithShardSpec::fromDataSegment).stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()), segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
            appenderator.close();
            Assert.assertTrue((boolean)appenderator.getSegments().isEmpty());
        }
    }

    @Test
    public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(100, 1024L, null, true, (RowIngestionMeters)new SimpleRowIngestionMeters(), true);){
            Appenderator appenderator = tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
            int nullHandlingOverhead = 1;
            Assert.assertEquals((long)(182 + nullHandlingOverhead), (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar", 1), null);
            Assert.assertEquals((long)(182 + nullHandlingOverhead), (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(1)));
            appenderator.close();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
        }
    }

    @Test
    public void testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(100, 1024L, null, true, (RowIngestionMeters)new SimpleRowIngestionMeters(), true);){
            Appenderator appenderator = tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
            int nullHandlingOverhead = 1;
            Assert.assertEquals((long)(182 + nullHandlingOverhead), (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar", 1), null);
            Assert.assertEquals((long)(364 + 2 * nullHandlingOverhead), (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
            Assert.assertEquals((long)2L, (long)appenderator.getSegments().size());
            appenderator.close();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
        }
    }

    @Test
    public void testMaxBytesInMemory() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(100, 15000L, true);){
            int i;
            Appenderator appenderator = tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
            int nullHandlingOverhead = 1;
            int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
            int sinkSizeOverhead = 5000;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)(currentInMemoryIndexSize + sinkSizeOverhead), (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
            for (i = 0; i < 53; ++i) {
                appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "bar_" + i, 1), null);
            }
            currentInMemoryIndexSize = 0;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "bob", 1), null);
            currentInMemoryIndexSize = 182 + nullHandlingOverhead;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)(currentInMemoryIndexSize + sinkSizeOverhead), (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
            for (i = 0; i < 53; ++i) {
                appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "bar_" + i, 1), null);
            }
            currentInMemoryIndexSize = 0;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.persistAll(null).get();
            appenderator.close();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
        }
    }

    @Test(expected=RuntimeException.class, timeout=5000L)
    public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(100, 5180L, true);){
            Appenderator appenderator = tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
        }
    }

    @Test
    public void testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(100, 10L, null, true, (RowIngestionMeters)new SimpleRowIngestionMeters(), true);){
            Appenderator appenderator = tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
        }
    }

    @Test
    public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(100, 10000L, true);){
            Appenderator appenderator = tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
            int nullHandlingOverhead = 1;
            int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
            int sinkSizeOverhead = 5000;
            Assert.assertEquals((long)(currentInMemoryIndexSize + sinkSizeOverhead), (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.close();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
        }
    }

    @Test
    public void testMaxBytesInMemoryInMultipleSinks() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(1000, 28748L, true);){
            int i;
            Appenderator appenderator = tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
            appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar", 1), null);
            int nullHandlingOverhead = 1;
            int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
            int sinkSizeOverhead = 10000;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(1)));
            Assert.assertEquals((long)(2 * currentInMemoryIndexSize + sinkSizeOverhead), (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
            for (i = 0; i < 49; ++i) {
                appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "bar_" + i, 1), null);
                appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar_" + i, 1), null);
            }
            currentInMemoryIndexSize = 0;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(1)));
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "bob", 1), null);
            currentInMemoryIndexSize = 182 + nullHandlingOverhead;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(1)));
            sinkSizeOverhead = 5000;
            Assert.assertEquals((long)(currentInMemoryIndexSize + sinkSizeOverhead), (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bob", 1), null);
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(1)));
            Assert.assertEquals((long)(2 * currentInMemoryIndexSize + (sinkSizeOverhead += 5000)), (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
            for (i = 0; i < 49; ++i) {
                appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "bar_" + i, 1), null);
                appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar_" + i, 1), null);
            }
            currentInMemoryIndexSize = 0;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(1)));
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.close();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
        }
    }

    @Test
    public void testIgnoreMaxBytesInMemory() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(100, -1L, true);){
            Appenderator appenderator = tester.getAppenderator();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.startJob();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
            int nullHandlingOverhead = 1;
            Assert.assertEquals((long)(182 + nullHandlingOverhead), (long)((BatchAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)1L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar", 1), null);
            int sinkSizeOverhead = 10000;
            Assert.assertEquals((long)(364 + 2 * nullHandlingOverhead + sinkSizeOverhead), (long)((BatchAppenderator)appenderator).getBytesCurrentlyInMemory());
            Assert.assertEquals((long)2L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.close();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
        }
    }

    @Test
    public void testMaxRowsInMemory() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(3, false);){
            Appenderator appenderator = tester.getAppenderator();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.startJob();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
            Assert.assertEquals((long)1L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar", 1), null);
            Assert.assertEquals((long)2L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar", 1), null);
            Assert.assertEquals((long)2L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bat", 1), null);
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "baz", 1), null);
            Assert.assertEquals((long)1L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "qux", 1), null);
            Assert.assertEquals((long)2L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "bob", 1), null);
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.close();
        }
    }

    @Test
    public void testAllHydrantsAreRecovered() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(1, false);){
            Appenderator appenderator = tester.getAppenderator();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.startJob();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo2", 1), null);
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo3", 1), null);
            SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata)appenderator.push((Collection)appenderator.getSegments(), null, false).get();
            Assert.assertEquals(IDENTIFIERS.subList(0, 1), Lists.transform((List)segmentsAndCommitMetadata.getSegments(), SegmentIdWithShardSpec::fromDataSegment).stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.close();
        }
    }

    @Test
    public void testTotalRowsPerSegment() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(3, false);){
            Appenderator appenderator = tester.getAppenderator();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            appenderator.startJob();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            Appenderator.AppenderatorAddResult addResult0 = appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
            Assert.assertEquals((long)1L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)1L, (long)addResult0.getNumRowsInSegment());
            Appenderator.AppenderatorAddResult addResult1 = appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar", 1), null);
            Assert.assertEquals((long)2L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)1L, (long)addResult1.getNumRowsInSegment());
            addResult1 = appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar", 1), null);
            Assert.assertEquals((long)2L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)1L, (long)addResult1.getNumRowsInSegment());
            addResult1 = appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bat", 1), null);
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)2L, (long)addResult1.getNumRowsInSegment());
            addResult1 = appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bat", 1), null);
            Assert.assertEquals((long)1L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)3L, (long)addResult1.getNumRowsInSegment());
            addResult0 = appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "baz", 1), null);
            Assert.assertEquals((long)2L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)2L, (long)addResult0.getNumRowsInSegment());
            addResult1 = appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "qux", 1), null);
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)4L, (long)addResult1.getNumRowsInSegment());
            addResult0 = appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "bob", 1), null);
            Assert.assertEquals((long)1L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)3L, (long)addResult0.getNumRowsInSegment());
            appenderator.close();
            Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
        }
    }

    @Test
    public void testRestoreFromDisk() throws Exception {
        BatchAppenderatorTester tester = new BatchAppenderatorTester(2, false);
        Appenderator appenderator = tester.getAppenderator();
        appenderator.startJob();
        appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
        appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "bar", 2), null);
        Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
        appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "baz", 3), null);
        appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "qux", 4), null);
        Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
        appenderator.add(IDENTIFIERS.get(2), BatchAppenderatorTest.createInputRow("2001", "bob", 5), null);
        Assert.assertEquals((long)1L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
        appenderator.persistAll(null).get();
        Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
        List segmentPaths = ((BatchAppenderator)appenderator).getPersistedidentifierPaths();
        Assert.assertNotNull((Object)segmentPaths);
        Assert.assertEquals((long)3L, (long)segmentPaths.size());
        appenderator.push(IDENTIFIERS, null, false).get();
        segmentPaths = ((BatchAppenderator)appenderator).getPersistedidentifierPaths();
        Assert.assertNotNull((Object)segmentPaths);
        Assert.assertEquals((long)0L, (long)segmentPaths.size());
        appenderator.close();
    }

    @Test
    public void testCleanupFromDiskAfterClose() throws Exception {
        BatchAppenderatorTester tester = new BatchAppenderatorTester(2, false);
        Appenderator appenderator = tester.getAppenderator();
        appenderator.startJob();
        appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
        appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "bar", 2), null);
        Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
        Assert.assertEquals((long)2L, (long)appenderator.getTotalRowCount());
        appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "baz", 3), null);
        appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "qux", 4), null);
        Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
        Assert.assertEquals((long)4L, (long)appenderator.getTotalRowCount());
        appenderator.add(IDENTIFIERS.get(2), BatchAppenderatorTest.createInputRow("2001", "bob", 5), null);
        Assert.assertEquals((long)1L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
        appenderator.persistAll(null).get();
        Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
        Assert.assertEquals((long)5L, (long)appenderator.getTotalRowCount());
        List segmentPaths = ((BatchAppenderator)appenderator).getPersistedidentifierPaths();
        Assert.assertNotNull((Object)segmentPaths);
        Assert.assertEquals((long)3L, (long)segmentPaths.size());
        appenderator.close();
        segmentPaths = ((BatchAppenderator)appenderator).getPersistedidentifierPaths();
        Assert.assertNotNull((Object)segmentPaths);
        Assert.assertEquals((long)0L, (long)segmentPaths.size());
        Assert.assertEquals((long)0L, (long)((BatchAppenderator)appenderator).getRowsInMemory());
        Assert.assertEquals((long)0L, (long)appenderator.getTotalRowCount());
    }

    @Test(timeout=5000L)
    public void testTotalRowCount() throws Exception {
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(3, false);){
            Appenderator appenderator = tester.getAppenderator();
            Assert.assertEquals((long)0L, (long)appenderator.getTotalRowCount());
            appenderator.startJob();
            Assert.assertEquals((long)0L, (long)appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
            Assert.assertEquals((long)1L, (long)appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar", 1), null);
            Assert.assertEquals((long)2L, (long)appenderator.getTotalRowCount());
            appenderator.persistAll(null).get();
            Assert.assertEquals((long)2L, (long)appenderator.getTotalRowCount());
            appenderator.drop(IDENTIFIERS.get(0)).get();
            Assert.assertEquals((long)1L, (long)appenderator.getTotalRowCount());
            appenderator.drop(IDENTIFIERS.get(1)).get();
            Assert.assertEquals((long)0L, (long)appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(2), BatchAppenderatorTest.createInputRow("2001", "bar", 1), null);
            Assert.assertEquals((long)1L, (long)appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(2), BatchAppenderatorTest.createInputRow("2001", "baz", 1), null);
            Assert.assertEquals((long)2L, (long)appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(2), BatchAppenderatorTest.createInputRow("2001", "qux", 1), null);
            Assert.assertEquals((long)3L, (long)appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(2), BatchAppenderatorTest.createInputRow("2001", "bob", 1), null);
            Assert.assertEquals((long)4L, (long)appenderator.getTotalRowCount());
            appenderator.persistAll(null).get();
            Assert.assertEquals((long)4L, (long)appenderator.getTotalRowCount());
            appenderator.drop(IDENTIFIERS.get(2)).get();
            Assert.assertEquals((long)0L, (long)appenderator.getTotalRowCount());
            appenderator.close();
            Assert.assertEquals((long)0L, (long)appenderator.getTotalRowCount());
        }
    }

    @Test
    public void testVerifyRowIngestionMetrics() throws Exception {
        SimpleRowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(5, 10000L, null, false, (RowIngestionMeters)rowIngestionMeters);){
            Appenderator appenderator = tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", "invalid_met"), null);
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "foo", 1), null);
            Assert.assertEquals((long)1L, (long)rowIngestionMeters.getProcessed());
            Assert.assertEquals((long)1L, (long)rowIngestionMeters.getProcessedWithError());
            Assert.assertEquals((long)0L, (long)rowIngestionMeters.getUnparseable());
            Assert.assertEquals((long)0L, (long)rowIngestionMeters.getThrownAway());
        }
    }

    @Test(timeout=10000L)
    public void testPushContract() throws Exception {
        SimpleRowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(1, 50000L, null, false, (RowIngestionMeters)rowIngestionMeters);){
            Appenderator appenderator = tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "bar", 1), null);
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "bar2", 1), null);
            appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar3", 1), null);
            SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata)appenderator.push(Collections.singletonList(IDENTIFIERS.get(0)), null, false).get();
            Assert.assertEquals(Collections.singletonList(IDENTIFIERS.get(0)), Lists.transform((List)segmentsAndCommitMetadata.getSegments(), SegmentIdWithShardSpec::fromDataSegment).stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()), segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
            appenderator.drop(IDENTIFIERS.get(0));
            Assert.assertEquals(Collections.singletonList(IDENTIFIERS.get(1)), (Object)appenderator.getSegments());
        }
    }

    @Test(timeout=5000L)
    public void testCloseContract() throws Exception {
        SimpleRowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
        try (BatchAppenderatorTester tester = new BatchAppenderatorTester(1, 50000L, null, false, (RowIngestionMeters)rowIngestionMeters);){
            Appenderator appenderator = tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "bar", 1), null);
            appenderator.add(IDENTIFIERS.get(0), BatchAppenderatorTest.createInputRow("2000", "bar2", 1), null);
            ListenableFuture firstFuture = appenderator.push(Collections.singletonList(IDENTIFIERS.get(0)), null, false);
            appenderator.add(IDENTIFIERS.get(1), BatchAppenderatorTest.createInputRow("2000", "bar3", 1), null);
            ListenableFuture secondFuture = appenderator.push(Collections.singletonList(IDENTIFIERS.get(1)), null, false);
            appenderator.close();
            Assert.assertTrue((!firstFuture.isCancelled() ? 1 : 0) != 0);
            Assert.assertTrue((!secondFuture.isCancelled() ? 1 : 0) != 0);
            Assert.assertTrue((boolean)firstFuture.isDone());
            Assert.assertTrue((boolean)secondFuture.isDone());
            SegmentsAndCommitMetadata segmentsAndCommitMetadataForFirstFuture = (SegmentsAndCommitMetadata)firstFuture.get();
            SegmentsAndCommitMetadata segmentsAndCommitMetadataForSecondFuture = (SegmentsAndCommitMetadata)secondFuture.get();
            Assert.assertEquals((long)segmentsAndCommitMetadataForFirstFuture.getSegments().size(), (long)1L);
            Assert.assertEquals((long)segmentsAndCommitMetadataForSecondFuture.getSegments().size(), (long)1L);
        }
    }

    private static SegmentIdWithShardSpec createNonUTCSegmentId(String interval, String version, int partitionNum) {
        return new SegmentIdWithShardSpec("foo", new Interval((Object)interval, (Chronology)ISOChronology.getInstance((DateTimeZone)DateTimes.inferTzFromString((String)"Asia/Seoul"))), version, (ShardSpec)new LinearShardSpec(Integer.valueOf(partitionNum)));
    }

    private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum) {
        return new SegmentIdWithShardSpec("foo", Intervals.of((String)interval), version, (ShardSpec)new LinearShardSpec(Integer.valueOf(partitionNum)));
    }

    static InputRow createInputRow(String ts, String dim, Object met) {
        return new MapBasedInputRow(DateTimes.of((String)ts).getMillis(), (List)ImmutableList.of((Object)"dim"), (Map)ImmutableMap.of((Object)"dim", (Object)dim, (Object)"met", (Object)met));
    }
}

