/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.segment.realtime.appenderator;

import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Order;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorTester;
import org.apache.druid.segment.realtime.sink.Committers;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableCauseMatcher;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.TemporaryFolder;

public class StreamAppenderatorTest
extends InitializedNullHandlingTest {
    private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of((Object)StreamAppenderatorTest.si("2000/2001", "A", 0), (Object)StreamAppenderatorTest.si("2000/2001", "A", 1), (Object)StreamAppenderatorTest.si("2001/2002", "A", 0));
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testSimpleIngestion() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2).basePersistDirectory(this.temporaryFolder.newFolder()).build();){
            Appenderator appenderator = tester.getAppenderator();
            ConcurrentHashMap<String, String> commitMetadata = new ConcurrentHashMap<String, String>();
            Supplier<Committer> committerSupplier = StreamAppenderatorTest.committerSupplierFromConcurrentMap(commitMetadata);
            Assert.assertEquals(null, (Object)appenderator.startJob());
            Assert.assertEquals((Object)"foo", (Object)appenderator.getDataSource());
            commitMetadata.put("x", "1");
            Assert.assertEquals((long)1L, (long)appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), committerSupplier).getNumRowsInSegment());
            commitMetadata.put("x", "2");
            Assert.assertEquals((long)2L, (long)appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "bar", 2), committerSupplier).getNumRowsInSegment());
            commitMetadata.put("x", "3");
            Assert.assertEquals((long)1L, (long)appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "qux", 4), committerSupplier).getNumRowsInSegment());
            Assert.assertEquals(IDENTIFIERS.subList(0, 2), StreamAppenderatorTest.sorted(appenderator.getSegments()));
            Assert.assertEquals((long)2L, (long)appenderator.getRowCount(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)1L, (long)appenderator.getRowCount(IDENTIFIERS.get(1)));
            boolean thrown = false;
            try {
                appenderator.getRowCount(IDENTIFIERS.get(2));
            }
            catch (IllegalStateException e) {
                thrown = true;
            }
            Assert.assertTrue((boolean)thrown);
            SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata)appenderator.push((Collection)appenderator.getSegments(), (Committer)committerSupplier.get(), false).get();
            Assert.assertEquals((Object)ImmutableMap.of((Object)"x", (Object)"3"), (Object)((Map)segmentsAndCommitMetadata.getCommitMetadata()));
            Assert.assertEquals(IDENTIFIERS.subList(0, 2), StreamAppenderatorTest.sorted(Lists.transform((List)segmentsAndCommitMetadata.getSegments(), (Function)new Function<DataSegment, SegmentIdWithShardSpec>(){

                public SegmentIdWithShardSpec apply(DataSegment input) {
                    return SegmentIdWithShardSpec.fromDataSegment((DataSegment)input);
                }
            })));
            Assert.assertEquals(StreamAppenderatorTest.sorted(tester.getPushedSegments()), StreamAppenderatorTest.sorted(segmentsAndCommitMetadata.getSegments()));
            appenderator.clear();
            Assert.assertTrue((boolean)appenderator.getSegments().isEmpty());
        }
    }

    @Test
    public void testPushFailure() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2).basePersistDirectory(this.temporaryFolder.newFolder()).enablePushFailure(true).build();){
            Appenderator appenderator = tester.getAppenderator();
            ConcurrentHashMap<String, String> commitMetadata = new ConcurrentHashMap<String, String>();
            Supplier<Committer> committerSupplier = StreamAppenderatorTest.committerSupplierFromConcurrentMap(commitMetadata);
            Assert.assertEquals(null, (Object)appenderator.startJob());
            Assert.assertEquals((Object)"foo", (Object)appenderator.getDataSource());
            commitMetadata.put("x", "1");
            Assert.assertEquals((long)1L, (long)appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), committerSupplier).getNumRowsInSegment());
            commitMetadata.put("x", "2");
            Assert.assertEquals((long)2L, (long)appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "bar", 2), committerSupplier).getNumRowsInSegment());
            commitMetadata.put("x", "3");
            Assert.assertEquals((long)1L, (long)appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "qux", 4), committerSupplier).getNumRowsInSegment());
            Assert.assertEquals(IDENTIFIERS.subList(0, 2), StreamAppenderatorTest.sorted(appenderator.getSegments()));
            Assert.assertEquals((long)2L, (long)appenderator.getRowCount(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)1L, (long)appenderator.getRowCount(IDENTIFIERS.get(1)));
            boolean thrown = false;
            try {
                appenderator.getRowCount(IDENTIFIERS.get(2));
            }
            catch (IllegalStateException e) {
                thrown = true;
            }
            Assert.assertTrue((boolean)thrown);
            ListenableFuture segmentsAndCommitMetadata = appenderator.push((Collection)appenderator.getSegments(), (Committer)committerSupplier.get(), false);
            ExecutionException e = (ExecutionException)Assert.assertThrows(ExecutionException.class, () -> segmentsAndCommitMetadata.get());
            MatcherAssert.assertThat((Object)e, (Matcher)ThrowableCauseMatcher.hasCause((Matcher)ThrowableCauseMatcher.hasCause((Matcher)CoreMatchers.instanceOf(IOException.class))));
            MatcherAssert.assertThat((Object)e, (Matcher)ThrowableCauseMatcher.hasCause((Matcher)ThrowableCauseMatcher.hasCause((Matcher)ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.startsWith((String)"Push failure test")))));
        }
    }

    @Test
    public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(100).maxSizeInBytes(1024L).basePersistDirectory(this.temporaryFolder.newFolder()).skipBytesInMemoryOverheadCheck(true).build();){
            Appenderator appenderator = tester.getAppenderator();
            AtomicInteger eventCount = new AtomicInteger(0);
            Supplier committerSupplier = () -> {
                final ImmutableMap metadata = ImmutableMap.of((Object)eventCount, (Object)eventCount.get());
                return new Committer(){

                    public Object getMetadata() {
                        return metadata;
                    }

                    public void run() {
                    }
                };
            };
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), committerSupplier);
            int nullHandlingOverhead = 1;
            Assert.assertEquals((long)(182 + nullHandlingOverhead), (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "bar", 1), committerSupplier);
            Assert.assertEquals((long)(182 + nullHandlingOverhead), (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(1)));
            appenderator.close();
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
        }
    }

    @Test
    public void testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(100).maxSizeInBytes(1024L).basePersistDirectory(this.temporaryFolder.newFolder()).skipBytesInMemoryOverheadCheck(true).build();){
            Appenderator appenderator = tester.getAppenderator();
            AtomicInteger eventCount = new AtomicInteger(0);
            Supplier committerSupplier = () -> {
                final ImmutableMap metadata = ImmutableMap.of((Object)eventCount, (Object)eventCount.get());
                return new Committer(){

                    public Object getMetadata() {
                        return metadata;
                    }

                    public void run() {
                    }
                };
            };
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), committerSupplier);
            int nullHandlingOverhead = 1;
            Assert.assertEquals((long)(182 + nullHandlingOverhead), (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "bar", 1), committerSupplier);
            Assert.assertEquals((long)(364 + 2 * nullHandlingOverhead), (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.close();
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
        }
    }

    @Test
    public void testMaxBytesInMemory() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(100).maxSizeInBytes(15000L).basePersistDirectory(this.temporaryFolder.newFolder()).build();){
            Appenderator appenderator = tester.getAppenderator();
            AtomicInteger eventCount = new AtomicInteger(0);
            Supplier committerSupplier = () -> {
                final ImmutableMap metadata = ImmutableMap.of((Object)eventCount, (Object)eventCount.get());
                return new Committer(){

                    public Object getMetadata() {
                        return metadata;
                    }

                    public void run() {
                    }
                };
            };
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), committerSupplier);
            int nullHandlingOverhead = 1;
            int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
            int sinkSizeOverhead = 5000;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)(currentInMemoryIndexSize + sinkSizeOverhead), (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
            for (int i = 0; i < 53; ++i) {
                appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "bar_" + i, 1), committerSupplier);
            }
            sinkSizeOverhead = 5000;
            currentInMemoryIndexSize = 0;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            int mappedIndexSize = 4012;
            Assert.assertEquals((long)(currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize), (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "bob", 1), committerSupplier);
            currentInMemoryIndexSize = 182 + nullHandlingOverhead;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)(currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize), (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
            for (int i = 0; i < 31; ++i) {
                appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "bar_" + i, 1), committerSupplier);
            }
            currentInMemoryIndexSize = 0;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            mappedIndexSize = 8024;
            Assert.assertEquals((long)(currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize), (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.close();
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
        }
    }

    @Test
    public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(100).maxSizeInBytes(5180L).basePersistDirectory(this.temporaryFolder.newFolder()).build();){
            Appenderator appenderator = tester.getAppenderator();
            AtomicInteger eventCount = new AtomicInteger(0);
            Supplier committerSupplier = () -> {
                final ImmutableMap metadata = ImmutableMap.of((Object)eventCount, (Object)eventCount.get());
                return new Committer(){

                    public Object getMetadata() {
                        return metadata;
                    }

                    public void run() {
                    }
                };
            };
            appenderator.startJob();
            Assert.assertThrows(RuntimeException.class, () -> appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), committerSupplier));
        }
    }

    @Test
    public void testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(100).maxSizeInBytes(10L).basePersistDirectory(this.temporaryFolder.newFolder()).skipBytesInMemoryOverheadCheck(true).build();){
            Appenderator appenderator = tester.getAppenderator();
            AtomicInteger eventCount = new AtomicInteger(0);
            Supplier committerSupplier = () -> {
                final ImmutableMap metadata = ImmutableMap.of((Object)eventCount, (Object)eventCount.get());
                return new Committer(){

                    public Object getMetadata() {
                        return metadata;
                    }

                    public void run() {
                    }
                };
            };
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), committerSupplier);
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), committerSupplier);
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
        }
    }

    @Test
    public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(100).maxSizeInBytes(10000L).basePersistDirectory(this.temporaryFolder.newFolder()).build();){
            Appenderator appenderator = tester.getAppenderator();
            AtomicInteger eventCount = new AtomicInteger(0);
            Supplier committerSupplier = () -> {
                final ImmutableMap metadata = ImmutableMap.of((Object)eventCount, (Object)eventCount.get());
                return new Committer(){

                    public Object getMetadata() {
                        return metadata;
                    }

                    public void run() {
                    }
                };
            };
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), committerSupplier);
            int nullHandlingOverhead = 1;
            int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
            int sinkSizeOverhead = 5000;
            Assert.assertEquals((long)(currentInMemoryIndexSize + sinkSizeOverhead), (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.close();
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
        }
    }

    @Test
    public void testMaxBytesInMemoryInMultipleSinks() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(100).maxSizeInBytes(31100L).basePersistDirectory(this.temporaryFolder.newFolder()).build();){
            Appenderator appenderator = tester.getAppenderator();
            AtomicInteger eventCount = new AtomicInteger(0);
            Supplier committerSupplier = () -> {
                final ImmutableMap metadata = ImmutableMap.of((Object)eventCount, (Object)eventCount.get());
                return new Committer(){

                    public Object getMetadata() {
                        return metadata;
                    }

                    public void run() {
                    }
                };
            };
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), committerSupplier);
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "bar", 1), committerSupplier);
            int nullHandlingOverhead = 1;
            int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
            int sinkSizeOverhead = 10000;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(1)));
            Assert.assertEquals((long)(2 * currentInMemoryIndexSize + sinkSizeOverhead), (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
            for (int i = 0; i < 49; ++i) {
                appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "bar_" + i, 1), committerSupplier);
                appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "bar_" + i, 1), committerSupplier);
            }
            sinkSizeOverhead = 10000;
            currentInMemoryIndexSize = 0;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(1)));
            int mappedIndexSize = 8024;
            Assert.assertEquals((long)(currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize), (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "bob", 1), committerSupplier);
            currentInMemoryIndexSize = 182 + nullHandlingOverhead;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(1)));
            Assert.assertEquals((long)(currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize), (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "bob", 1), committerSupplier);
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(1)));
            Assert.assertEquals((long)(2 * currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize), (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
            for (int i = 0; i < 34; ++i) {
                appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "bar_" + i, 1), committerSupplier);
                appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "bar_" + i, 1), committerSupplier);
            }
            currentInMemoryIndexSize = 0;
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)currentInMemoryIndexSize, (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(1)));
            mappedIndexSize = 16048;
            Assert.assertEquals((long)(currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize), (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
            appenderator.close();
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
        }
    }

    @Test
    public void testIgnoreMaxBytesInMemory() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(100).maxSizeInBytes(-1L).basePersistDirectory(this.temporaryFolder.newFolder()).build();){
            Appenderator appenderator = tester.getAppenderator();
            AtomicInteger eventCount = new AtomicInteger(0);
            Supplier committerSupplier = () -> {
                final ImmutableMap metadata = ImmutableMap.of((Object)eventCount, (Object)eventCount.get());
                return new Committer(){

                    public Object getMetadata() {
                        return metadata;
                    }

                    public void run() {
                    }
                };
            };
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.startJob();
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), committerSupplier);
            int nullHandlingOverhead = 1;
            Assert.assertEquals((long)(182 + nullHandlingOverhead), (long)((StreamAppenderator)appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals((long)1L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "bar", 1), committerSupplier);
            int sinkSizeOverhead = 10000;
            Assert.assertEquals((long)(364 + 2 * nullHandlingOverhead + sinkSizeOverhead), (long)((StreamAppenderator)appenderator).getBytesCurrentlyInMemory());
            Assert.assertEquals((long)2L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.close();
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
        }
    }

    @Test
    public void testMaxRowsInMemory() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(3).basePersistDirectory(this.temporaryFolder.newFolder()).build();){
            Appenderator appenderator = tester.getAppenderator();
            final AtomicInteger eventCount = new AtomicInteger(0);
            Supplier<Committer> committerSupplier = new Supplier<Committer>(){

                public Committer get() {
                    final ImmutableMap metadata = ImmutableMap.of((Object)"eventCount", (Object)eventCount.get());
                    return new Committer(){

                        public Object getMetadata() {
                            return metadata;
                        }

                        public void run() {
                        }
                    };
                }
            };
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.startJob();
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), (Supplier)committerSupplier);
            Assert.assertEquals((long)1L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "bar", 1), (Supplier)committerSupplier);
            Assert.assertEquals((long)2L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "bar", 1), (Supplier)committerSupplier);
            Assert.assertEquals((long)2L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "baz", 1), (Supplier)committerSupplier);
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "qux", 1), (Supplier)committerSupplier);
            Assert.assertEquals((long)1L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "bob", 1), (Supplier)committerSupplier);
            Assert.assertEquals((long)2L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.persistAll((Committer)committerSupplier.get());
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.close();
        }
    }

    @Test
    public void testMaxRowsInMemoryDisallowIncrementalPersists() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(3).basePersistDirectory(this.temporaryFolder.newFolder()).build();){
            Appenderator appenderator = tester.getAppenderator();
            AtomicInteger eventCount = new AtomicInteger(0);
            Supplier committerSupplier = () -> {
                final ImmutableMap metadata = ImmutableMap.of((Object)"eventCount", (Object)eventCount.get());
                return new Committer(){

                    public Object getMetadata() {
                        return metadata;
                    }

                    public void run() {
                    }
                };
            };
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.startJob();
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), committerSupplier, false);
            Assert.assertEquals((long)1L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "bar", 1), committerSupplier, false);
            Assert.assertEquals((long)2L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "bar", 1), committerSupplier, false);
            Assert.assertEquals((long)2L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "baz", 1), committerSupplier, false);
            Assert.assertEquals((long)3L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "qux", 1), committerSupplier, false);
            Assert.assertEquals((long)4L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "bob", 1), committerSupplier, false);
            Assert.assertEquals((long)5L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.persistAll((Committer)committerSupplier.get());
            Assert.assertEquals((long)0L, (long)((StreamAppenderator)appenderator).getRowsInMemory());
            appenderator.close();
        }
    }

    @Test
    public void testRestoreFromDisk() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2).basePersistDirectory(this.temporaryFolder.newFolder()).build();){
            Appenderator appenderator = tester.getAppenderator();
            AppenderatorConfig tuningConfig = tester.getTuningConfig();
            final AtomicInteger eventCount = new AtomicInteger(0);
            Supplier<Committer> committerSupplier = new Supplier<Committer>(){

                public Committer get() {
                    final ImmutableMap metadata = ImmutableMap.of((Object)"eventCount", (Object)eventCount.get());
                    return new Committer(){

                        public Object getMetadata() {
                            return metadata;
                        }

                        public void run() {
                        }
                    };
                }
            };
            appenderator.startJob();
            eventCount.incrementAndGet();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), (Supplier)committerSupplier);
            eventCount.incrementAndGet();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "bar", 2), (Supplier)committerSupplier);
            eventCount.incrementAndGet();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "baz", 3), (Supplier)committerSupplier);
            eventCount.incrementAndGet();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "qux", 4), (Supplier)committerSupplier);
            eventCount.incrementAndGet();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "bob", 5), (Supplier)committerSupplier);
            appenderator.close();
            try (StreamAppenderatorTester tester2 = new StreamAppenderatorTester.Builder().maxRowsInMemory(2).basePersistDirectory(tuningConfig.getBasePersistDirectory()).build();){
                Appenderator appenderator2 = tester2.getAppenderator();
                Assert.assertEquals((Object)ImmutableMap.of((Object)"eventCount", (Object)4), (Object)appenderator2.startJob());
                Assert.assertEquals((Object)ImmutableList.of((Object)IDENTIFIERS.get(0)), (Object)appenderator2.getSegments());
                Assert.assertEquals((long)4L, (long)appenderator2.getRowCount(IDENTIFIERS.get(0)));
            }
        }
    }

    @Test(timeout=60000L)
    public void testTotalRowCount() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(3).basePersistDirectory(this.temporaryFolder.newFolder()).build();){
            Appenderator appenderator = tester.getAppenderator();
            ConcurrentHashMap<String, String> commitMetadata = new ConcurrentHashMap<String, String>();
            Supplier<Committer> committerSupplier = StreamAppenderatorTest.committerSupplierFromConcurrentMap(commitMetadata);
            Assert.assertEquals((long)0L, (long)appenderator.getTotalRowCount());
            appenderator.startJob();
            Assert.assertEquals((long)0L, (long)appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), committerSupplier);
            Assert.assertEquals((long)1L, (long)appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "bar", 1), committerSupplier);
            Assert.assertEquals((long)2L, (long)appenderator.getTotalRowCount());
            appenderator.persistAll((Committer)committerSupplier.get()).get();
            Assert.assertEquals((long)2L, (long)appenderator.getTotalRowCount());
            appenderator.drop(IDENTIFIERS.get(0)).get();
            Assert.assertEquals((long)1L, (long)appenderator.getTotalRowCount());
            appenderator.drop(IDENTIFIERS.get(1)).get();
            Assert.assertEquals((long)0L, (long)appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001", "bar", 1), committerSupplier);
            Assert.assertEquals((long)1L, (long)appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001", "baz", 1), committerSupplier);
            Assert.assertEquals((long)2L, (long)appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001", "qux", 1), committerSupplier);
            Assert.assertEquals((long)3L, (long)appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001", "bob", 1), committerSupplier);
            Assert.assertEquals((long)4L, (long)appenderator.getTotalRowCount());
            appenderator.persistAll((Committer)committerSupplier.get()).get();
            Assert.assertEquals((long)4L, (long)appenderator.getTotalRowCount());
            appenderator.drop(IDENTIFIERS.get(2)).get();
            Assert.assertEquals((long)0L, (long)appenderator.getTotalRowCount());
            appenderator.close();
            Assert.assertEquals((long)0L, (long)appenderator.getTotalRowCount());
        }
    }

    @Test
    public void testVerifyRowIngestionMetrics() throws Exception {
        SimpleRowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(5).maxSizeInBytes(10000L).basePersistDirectory(this.temporaryFolder.newFolder()).rowIngestionMeters((RowIngestionMeters)rowIngestionMeters).build();){
            Appenderator appenderator = tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", "invalid_met"), Committers.nilSupplier());
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), Committers.nilSupplier());
            Assert.assertEquals((long)1L, (long)rowIngestionMeters.getProcessed());
            Assert.assertEquals((long)1L, (long)rowIngestionMeters.getProcessedWithError());
            Assert.assertEquals((long)0L, (long)rowIngestionMeters.getUnparseable());
            Assert.assertEquals((long)0L, (long)rowIngestionMeters.getThrownAway());
        }
    }

    @Test
    public void testDelayedDrop() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2).basePersistDirectory(this.temporaryFolder.newFolder()).withSegmentDropDelayInMilli(1000).build();){
            Appenderator appenderator = tester.getAppenderator();
            class TestScheduledThreadPoolExecutor
            extends ScheduledThreadPoolExecutor {
                ScheduledFuture<?> scheduledFuture;

                public TestScheduledThreadPoolExecutor() {
                    super(1);
                }

                @Override
                public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
                    ScheduledFuture<?> future = super.schedule(command, delay, unit);
                    this.scheduledFuture = future;
                    return future;
                }

                ScheduledFuture<?> getLastScheduledFuture() {
                    return this.scheduledFuture;
                }
            }
            TestScheduledThreadPoolExecutor testExec = new TestScheduledThreadPoolExecutor();
            ((StreamAppenderator)appenderator).setExec((ScheduledExecutorService)testExec);
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 2), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "foo", 4), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001", "foo", 8), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T01", "foo", 16), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T02", "foo", 32), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T03", "foo", 64), Suppliers.ofInstance((Object)Committers.nil()));
            TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder().dataSource("foo").intervals((List)ImmutableList.of((Object)Intervals.of((String)"2000/2001"))).aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).build();
            appenderator.drop(IDENTIFIERS.get(0)).get();
            List results1 = QueryPlus.wrap((Query)query1).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query1", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2000"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)3L, (Object)"met", (Object)7L)))), (Object)results1);
            testExec.getLastScheduledFuture().get(5000L, TimeUnit.MILLISECONDS);
            List results = QueryPlus.wrap((Query)query1).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            ImmutableList expectedResults = ImmutableList.of((Object)new Result(DateTimes.of((String)"2000"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)1L, (Object)"met", (Object)4L))));
            Assert.assertEquals((String)"query after dropped", (Object)expectedResults, (Object)results);
        }
    }

    @Test
    public void testQueryBySegments_withSegmentVersionUpgrades() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2).basePersistDirectory(this.temporaryFolder.newFolder()).build();){
            StreamAppenderator appenderator = (StreamAppenderator)tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 2), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.registerUpgradedPendingSegment(PendingSegmentRecord.create((SegmentIdWithShardSpec)StreamAppenderatorTest.si("2000/2001", "B", 1), (String)StreamAppenderatorTest.si("2000/2001", "B", 1).asSegmentId().toString(), (String)IDENTIFIERS.get(0).asSegmentId().toString(), (String)IDENTIFIERS.get(0).asSegmentId().toString(), (String)"foo"));
            appenderator.add(StreamAppenderatorTest.si("2000/2001", "B", 2), StreamAppenderatorTest.ir("2000", "foo", 4), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001", "foo", 8), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T01", "foo", 16), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.registerUpgradedPendingSegment(PendingSegmentRecord.create((SegmentIdWithShardSpec)StreamAppenderatorTest.si("2001/2002", "B", 1), (String)StreamAppenderatorTest.si("2001/2002", "B", 1).asSegmentId().toString(), (String)IDENTIFIERS.get(2).asSegmentId().toString(), (String)IDENTIFIERS.get(2).asSegmentId().toString(), (String)"foo"));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T02", "foo", 32), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T03", "foo", 64), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.registerUpgradedPendingSegment(PendingSegmentRecord.create((SegmentIdWithShardSpec)StreamAppenderatorTest.si("2001/2002", "C", 7), (String)StreamAppenderatorTest.si("2001/2002", "C", 7).asSegmentId().toString(), (String)IDENTIFIERS.get(2).asSegmentId().toString(), (String)IDENTIFIERS.get(2).asSegmentId().toString(), (String)"foo"));
            TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder().dataSource("foo").aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(IDENTIFIERS.get(2).getInterval(), IDENTIFIERS.get(2).getVersion(), IDENTIFIERS.get(2).getShardSpec().getPartitionNum())))).build();
            TimeseriesQuery query1_B = Druids.newTimeseriesQueryBuilder().dataSource("foo").aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(IDENTIFIERS.get(2).getInterval(), "B", 1)))).build();
            TimeseriesQuery query1_C = Druids.newTimeseriesQueryBuilder().dataSource("foo").aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(IDENTIFIERS.get(2).getInterval(), "C", 7)))).build();
            List results1 = QueryPlus.wrap((Query)query1).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query1", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)4L, (Object)"met", (Object)120L)))), (Object)results1);
            List results1_B = QueryPlus.wrap((Query)query1_B).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query1_B", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)4L, (Object)"met", (Object)120L)))), (Object)results1_B);
            List results1_C = QueryPlus.wrap((Query)query1_C).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query1_C", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)4L, (Object)"met", (Object)120L)))), (Object)results1_C);
            TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder().dataSource("foo").aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(Intervals.of((String)"2001/PT1H"), IDENTIFIERS.get(2).getVersion(), IDENTIFIERS.get(2).getShardSpec().getPartitionNum())))).build();
            TimeseriesQuery query2_B = Druids.newTimeseriesQueryBuilder().dataSource("foo").aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(Intervals.of((String)"2001/PT1H"), "B", 1)))).build();
            TimeseriesQuery query2_C = Druids.newTimeseriesQueryBuilder().dataSource("foo").aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(Intervals.of((String)"2001/PT1H"), "C", 7)))).build();
            List results2 = QueryPlus.wrap((Query)query2).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query2", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)1L, (Object)"met", (Object)8L)))), (Object)results2);
            List results2_B = QueryPlus.wrap((Query)query2_B).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query2_B", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)1L, (Object)"met", (Object)8L)))), (Object)results2_B);
            List results2_C = QueryPlus.wrap((Query)query2_C).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query2_C", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)1L, (Object)"met", (Object)8L)))), (Object)results2_C);
            TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder().dataSource("foo").aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(Intervals.of((String)"2001/PT1H"), IDENTIFIERS.get(2).getVersion(), IDENTIFIERS.get(2).getShardSpec().getPartitionNum()), (Object)new SegmentDescriptor(Intervals.of((String)"2001T03/PT1H"), IDENTIFIERS.get(2).getVersion(), IDENTIFIERS.get(2).getShardSpec().getPartitionNum())))).build();
            TimeseriesQuery query3_B = Druids.newTimeseriesQueryBuilder().dataSource("foo").aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(Intervals.of((String)"2001/PT1H"), "B", 1), (Object)new SegmentDescriptor(Intervals.of((String)"2001T03/PT1H"), "B", 1)))).build();
            TimeseriesQuery query3_C = Druids.newTimeseriesQueryBuilder().dataSource("foo").aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(Intervals.of((String)"2001/PT1H"), "C", 7), (Object)new SegmentDescriptor(Intervals.of((String)"2001T03/PT1H"), "C", 7)))).build();
            List results3 = QueryPlus.wrap((Query)query3).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query3", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)2L, (Object)"met", (Object)72L)))), (Object)results3);
            List results3_B = QueryPlus.wrap((Query)query3_B).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query3_B", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)2L, (Object)"met", (Object)72L)))), (Object)results3_B);
            List results3_C = QueryPlus.wrap((Query)query3_C).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query3_C", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)2L, (Object)"met", (Object)72L)))), (Object)results3_C);
            ScanQuery query4 = Druids.newScanQueryBuilder().dataSource("foo").intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(Intervals.of((String)"2001/PT1H"), IDENTIFIERS.get(2).getVersion(), IDENTIFIERS.get(2).getShardSpec().getPartitionNum()), (Object)new SegmentDescriptor(Intervals.of((String)"2001T03/PT1H"), IDENTIFIERS.get(2).getVersion(), IDENTIFIERS.get(2).getShardSpec().getPartitionNum())))).order(Order.ASCENDING).batchSize(10).resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST).build();
            ScanQuery query4_B = Druids.newScanQueryBuilder().dataSource("foo").intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(Intervals.of((String)"2001/PT1H"), "B", 1), (Object)new SegmentDescriptor(Intervals.of((String)"2001T03/PT1H"), "B", 1)))).order(Order.ASCENDING).batchSize(10).resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST).build();
            ScanQuery query4_C = Druids.newScanQueryBuilder().dataSource("foo").intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(Intervals.of((String)"2001/PT1H"), "C", 7), (Object)new SegmentDescriptor(Intervals.of((String)"2001T03/PT1H"), "C", 7)))).order(Order.ASCENDING).batchSize(10).resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST).build();
            List results4 = QueryPlus.wrap((Query)query4).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((long)2L, (long)results4.size());
            Assert.assertArrayEquals((Object[])new String[]{"__time", "dim", "count", "met"}, (Object[])((ScanResultValue)results4.get(0)).getColumns().toArray());
            Assert.assertArrayEquals((Object[])new Object[]{DateTimes.of((String)"2001").getMillis(), "foo", 1L, 8L}, (Object[])((List)((List)((ScanResultValue)results4.get(0)).getEvents()).get(0)).toArray());
            Assert.assertArrayEquals((Object[])new String[]{"__time", "dim", "count", "met"}, (Object[])((ScanResultValue)results4.get(0)).getColumns().toArray());
            Assert.assertArrayEquals((Object[])new Object[]{DateTimes.of((String)"2001T03").getMillis(), "foo", 1L, 64L}, (Object[])((List)((List)((ScanResultValue)results4.get(1)).getEvents()).get(0)).toArray());
            List results4_B = QueryPlus.wrap((Query)query4_B).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((long)2L, (long)results4_B.size());
            Assert.assertArrayEquals((Object[])new String[]{"__time", "dim", "count", "met"}, (Object[])((ScanResultValue)results4_B.get(0)).getColumns().toArray());
            Assert.assertArrayEquals((Object[])new Object[]{DateTimes.of((String)"2001").getMillis(), "foo", 1L, 8L}, (Object[])((List)((List)((ScanResultValue)results4_B.get(0)).getEvents()).get(0)).toArray());
            Assert.assertArrayEquals((Object[])new String[]{"__time", "dim", "count", "met"}, (Object[])((ScanResultValue)results4_B.get(0)).getColumns().toArray());
            Assert.assertArrayEquals((Object[])new Object[]{DateTimes.of((String)"2001T03").getMillis(), "foo", 1L, 64L}, (Object[])((List)((List)((ScanResultValue)results4_B.get(1)).getEvents()).get(0)).toArray());
            List results4_C = QueryPlus.wrap((Query)query4_C).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((long)2L, (long)results4_C.size());
            Assert.assertArrayEquals((Object[])new String[]{"__time", "dim", "count", "met"}, (Object[])((ScanResultValue)results4_C.get(0)).getColumns().toArray());
            Assert.assertArrayEquals((Object[])new Object[]{DateTimes.of((String)"2001").getMillis(), "foo", 1L, 8L}, (Object[])((List)((List)((ScanResultValue)results4_C.get(0)).getEvents()).get(0)).toArray());
            Assert.assertArrayEquals((Object[])new String[]{"__time", "dim", "count", "met"}, (Object[])((ScanResultValue)results4_C.get(0)).getColumns().toArray());
            Assert.assertArrayEquals((Object[])new Object[]{DateTimes.of((String)"2001T03").getMillis(), "foo", 1L, 64L}, (Object[])((List)((List)((ScanResultValue)results4_C.get(1)).getEvents()).get(0)).toArray());
        }
    }

    @Test
    public void testQueryByIntervals_withSegmentVersionUpgrades() throws Exception {
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2).basePersistDirectory(this.temporaryFolder.newFolder()).build();){
            StreamAppenderator appenderator = (StreamAppenderator)tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 2), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.registerUpgradedPendingSegment(PendingSegmentRecord.create((SegmentIdWithShardSpec)StreamAppenderatorTest.si("2000/2001", "B", 1), (String)StreamAppenderatorTest.si("2000/2001", "B", 1).asSegmentId().toString(), (String)IDENTIFIERS.get(0).asSegmentId().toString(), (String)IDENTIFIERS.get(0).asSegmentId().toString(), (String)"foo"));
            appenderator.add(StreamAppenderatorTest.si("2000/2001", "B", 2), StreamAppenderatorTest.ir("2000", "foo", 4), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001", "foo", 8), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T01", "foo", 16), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.registerUpgradedPendingSegment(PendingSegmentRecord.create((SegmentIdWithShardSpec)StreamAppenderatorTest.si("2001/2002", "B", 1), (String)StreamAppenderatorTest.si("2001/2002", "B", 1).asSegmentId().toString(), (String)IDENTIFIERS.get(2).asSegmentId().toString(), (String)IDENTIFIERS.get(2).asSegmentId().toString(), (String)"foo"));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T02", "foo", 32), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T03", "foo", 64), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.registerUpgradedPendingSegment(PendingSegmentRecord.create((SegmentIdWithShardSpec)StreamAppenderatorTest.si("2001/2002", "C", 7), (String)StreamAppenderatorTest.si("2001/2002", "C", 7).asSegmentId().toString(), (String)IDENTIFIERS.get(2).asSegmentId().toString(), (String)IDENTIFIERS.get(2).asSegmentId().toString(), (String)"foo"));
            TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder().dataSource("foo").intervals((List)ImmutableList.of((Object)Intervals.of((String)"2000/2001"))).aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).build();
            List results1 = QueryPlus.wrap((Query)query1).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query1", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2000"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)3L, (Object)"met", (Object)7L)))), (Object)results1);
            TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder().dataSource("foo").intervals((List)ImmutableList.of((Object)Intervals.of((String)"2000/2002"))).aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).build();
            List results2 = QueryPlus.wrap((Query)query2).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query2", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2000"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)3L, (Object)"met", (Object)7L))), (Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)4L, (Object)"met", (Object)120L)))), (Object)results2);
            TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder().dataSource("foo").intervals((List)ImmutableList.of((Object)Intervals.of((String)"2000/2001T01"))).aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).build();
            List results3 = QueryPlus.wrap((Query)query3).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2000"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)3L, (Object)"met", (Object)7L))), (Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)1L, (Object)"met", (Object)8L)))), (Object)results3);
            TimeseriesQuery query4 = Druids.newTimeseriesQueryBuilder().dataSource("foo").intervals((List)ImmutableList.of((Object)Intervals.of((String)"2000/2001T01"), (Object)Intervals.of((String)"2001T03/2001T04"))).aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).build();
            List results4 = QueryPlus.wrap((Query)query4).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2000"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)3L, (Object)"met", (Object)7L))), (Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)2L, (Object)"met", (Object)72L)))), (Object)results4);
            appenderator.drop(IDENTIFIERS.get(0)).get();
            appenderator.drop(StreamAppenderatorTest.si("2000/2001", "B", 1)).get();
            List resultsAfterDrop1 = QueryPlus.wrap((Query)query1).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query1", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2000"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)1L, (Object)"met", (Object)4L)))), (Object)resultsAfterDrop1);
            List resultsAfterDrop2 = QueryPlus.wrap((Query)query2).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query2", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2000"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)1L, (Object)"met", (Object)4L))), (Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)4L, (Object)"met", (Object)120L)))), (Object)resultsAfterDrop2);
            List resultsAfterDrop3 = QueryPlus.wrap((Query)query3).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2000"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)1L, (Object)"met", (Object)4L))), (Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)1L, (Object)"met", (Object)8L)))), (Object)resultsAfterDrop3);
            List resultsAfterDrop4 = QueryPlus.wrap((Query)query4).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2000"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)1L, (Object)"met", (Object)4L))), (Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)2L, (Object)"met", (Object)72L)))), (Object)resultsAfterDrop4);
        }
    }

    @Test
    public void testQueryByIntervals() throws Exception {
        try (StubServiceEmitter serviceEmitter = new StubServiceEmitter();
             StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2).basePersistDirectory(this.temporaryFolder.newFolder()).withServiceEmitter((ServiceEmitter)serviceEmitter).build();){
            Appenderator appenderator = tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 2), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "foo", 4), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001", "foo", 8), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T01", "foo", 16), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T02", "foo", 32), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T03", "foo", 64), Suppliers.ofInstance((Object)Committers.nil()));
            TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder().dataSource("foo").intervals((List)ImmutableList.of((Object)Intervals.of((String)"2000/2001"))).aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).build();
            List results1 = QueryPlus.wrap((Query)query1).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query1", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2000"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)3L, (Object)"met", (Object)7L)))), (Object)results1);
            this.verifySinkMetrics(serviceEmitter, new HashSet<String>(Arrays.asList(IDENTIFIERS.get(0).asSegmentId().toString(), IDENTIFIERS.get(1).asSegmentId().toString())));
            serviceEmitter.flush();
            TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder().dataSource("foo").intervals((List)ImmutableList.of((Object)Intervals.of((String)"2000/2001T01"))).aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).build();
            List results3 = QueryPlus.wrap((Query)query3).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2000"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)3L, (Object)"met", (Object)7L))), (Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)1L, (Object)"met", (Object)8L)))), (Object)results3);
            this.verifySinkMetrics(serviceEmitter, new HashSet<String>(Arrays.asList(IDENTIFIERS.get(0).asSegmentId().toString(), IDENTIFIERS.get(1).asSegmentId().toString(), IDENTIFIERS.get(2).asSegmentId().toString())));
            serviceEmitter.flush();
            TimeseriesQuery query4 = Druids.newTimeseriesQueryBuilder().dataSource("foo").intervals((List)ImmutableList.of((Object)Intervals.of((String)"2000/2001T01"), (Object)Intervals.of((String)"2001T03/2001T04"))).aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).build();
            List results4 = QueryPlus.wrap((Query)query4).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2000"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)3L, (Object)"met", (Object)7L))), (Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)2L, (Object)"met", (Object)72L)))), (Object)results4);
            this.verifySinkMetrics(serviceEmitter, new HashSet<String>(Arrays.asList(IDENTIFIERS.get(0).asSegmentId().toString(), IDENTIFIERS.get(1).asSegmentId().toString(), IDENTIFIERS.get(2).asSegmentId().toString())));
        }
    }

    @Test
    public void testQueryBySegments() throws Exception {
        try (StubServiceEmitter serviceEmitter = new StubServiceEmitter();
             StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2).basePersistDirectory(this.temporaryFolder.newFolder()).withServiceEmitter((ServiceEmitter)serviceEmitter).build();){
            Appenderator appenderator = tester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 2), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "foo", 4), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001", "foo", 8), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T01", "foo", 16), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T02", "foo", 32), Suppliers.ofInstance((Object)Committers.nil()));
            appenderator.add(IDENTIFIERS.get(2), StreamAppenderatorTest.ir("2001T03", "foo", 64), Suppliers.ofInstance((Object)Committers.nil()));
            TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder().dataSource("foo").aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(IDENTIFIERS.get(2).getInterval(), IDENTIFIERS.get(2).getVersion(), IDENTIFIERS.get(2).getShardSpec().getPartitionNum())))).build();
            List results1 = QueryPlus.wrap((Query)query1).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query1", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)4L, (Object)"met", (Object)120L)))), (Object)results1);
            this.verifySinkMetrics(serviceEmitter, new HashSet<String>(Collections.singletonList(IDENTIFIERS.get(2).asSegmentId().toString())));
            serviceEmitter.flush();
            TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder().dataSource("foo").aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(Intervals.of((String)"2001/PT1H"), IDENTIFIERS.get(2).getVersion(), IDENTIFIERS.get(2).getShardSpec().getPartitionNum())))).build();
            List results2 = QueryPlus.wrap((Query)query2).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query2", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)1L, (Object)"met", (Object)8L)))), (Object)results2);
            this.verifySinkMetrics(serviceEmitter, new HashSet<String>(Collections.singletonList(IDENTIFIERS.get(2).asSegmentId().toString())));
            serviceEmitter.flush();
            TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder().dataSource("foo").aggregators(Arrays.asList(new LongSumAggregatorFactory("count", "count"), new LongSumAggregatorFactory("met", "met"))).granularity(Granularities.DAY).intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(Intervals.of((String)"2001/PT1H"), IDENTIFIERS.get(2).getVersion(), IDENTIFIERS.get(2).getShardSpec().getPartitionNum()), (Object)new SegmentDescriptor(Intervals.of((String)"2001T03/PT1H"), IDENTIFIERS.get(2).getVersion(), IDENTIFIERS.get(2).getShardSpec().getPartitionNum())))).build();
            List results3 = QueryPlus.wrap((Query)query3).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((String)"query3", (Object)ImmutableList.of((Object)new Result(DateTimes.of((String)"2001"), (Object)new TimeseriesResultValue((Map)ImmutableMap.of((Object)"count", (Object)2L, (Object)"met", (Object)72L)))), (Object)results3);
            this.verifySinkMetrics(serviceEmitter, new HashSet<String>(Collections.singletonList(IDENTIFIERS.get(2).asSegmentId().toString())));
            serviceEmitter.flush();
            ScanQuery query4 = Druids.newScanQueryBuilder().dataSource("foo").intervals((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)new SegmentDescriptor(Intervals.of((String)"2001/PT1H"), IDENTIFIERS.get(2).getVersion(), IDENTIFIERS.get(2).getShardSpec().getPartitionNum()), (Object)new SegmentDescriptor(Intervals.of((String)"2001T03/PT1H"), IDENTIFIERS.get(2).getVersion(), IDENTIFIERS.get(2).getShardSpec().getPartitionNum())))).order(Order.ASCENDING).batchSize(10).resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST).build();
            List results4 = QueryPlus.wrap((Query)query4).run((QuerySegmentWalker)appenderator, ResponseContext.createEmpty()).toList();
            Assert.assertEquals((long)2L, (long)results4.size());
            Assert.assertArrayEquals((Object[])new String[]{"__time", "dim", "count", "met"}, (Object[])((ScanResultValue)results4.get(0)).getColumns().toArray());
            Assert.assertArrayEquals((Object[])new Object[]{DateTimes.of((String)"2001").getMillis(), "foo", 1L, 8L}, (Object[])((List)((List)((ScanResultValue)results4.get(0)).getEvents()).get(0)).toArray());
            Assert.assertArrayEquals((Object[])new String[]{"__time", "dim", "count", "met"}, (Object[])((ScanResultValue)results4.get(0)).getColumns().toArray());
            Assert.assertArrayEquals((Object[])new Object[]{DateTimes.of((String)"2001T03").getMillis(), "foo", 1L, 64L}, (Object[])((List)((List)((ScanResultValue)results4.get(1)).getEvents()).get(0)).toArray());
            this.verifySinkMetrics(serviceEmitter, new HashSet<String>(Collections.singletonList(IDENTIFIERS.get(2).asSegmentId().toString())));
            serviceEmitter.flush();
        }
    }

    private void verifySinkMetrics(StubServiceEmitter emitter, Set<String> segmentIds) {
        Map events = emitter.getMetricEvents();
        int segments = segmentIds.size();
        Assert.assertEquals((long)4L, (long)events.size());
        Assert.assertTrue((boolean)events.containsKey("query/cpu/time"));
        Assert.assertEquals((long)segments, (long)((List)events.get("query/segment/time")).size());
        Assert.assertEquals((long)segments, (long)((List)events.get("query/segmentAndCache/time")).size());
        Assert.assertEquals((long)segments, (long)((List)events.get("query/wait/time")).size());
        for (String id : segmentIds) {
            Assert.assertTrue((boolean)((List)events.get("query/segment/time")).stream().anyMatch(value -> value.getUserDims().containsValue(id)));
            Assert.assertTrue((boolean)((List)events.get("query/segmentAndCache/time")).stream().anyMatch(value -> value.getUserDims().containsValue(id)));
            Assert.assertTrue((boolean)((List)events.get("query/wait/time")).stream().anyMatch(value -> value.getUserDims().containsValue(id)));
        }
    }

    @Test
    public void testSchemaAnnouncement() throws Exception {
        TestSchemaAnnouncer dataSegmentAnnouncer = new TestSchemaAnnouncer();
        try (StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2).basePersistDirectory(this.temporaryFolder.newFolder()).build(dataSegmentAnnouncer, CentralizedDatasourceSchemaConfig.create());){
            StreamAppenderator appenderator = (StreamAppenderator)tester.getAppenderator();
            ConcurrentHashMap<String, String> commitMetadata = new ConcurrentHashMap<String, String>();
            Supplier<Committer> committerSupplier = StreamAppenderatorTest.committerSupplierFromConcurrentMap(commitMetadata);
            StreamAppenderator.SinkSchemaAnnouncer sinkSchemaAnnouncer = appenderator.getSinkSchemaAnnouncer();
            Assert.assertEquals(null, (Object)appenderator.startJob());
            Assert.assertEquals((Object)"foo", (Object)appenderator.getDataSource());
            commitMetadata.put("x", "1");
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "foo", 1), committerSupplier);
            sinkSchemaAnnouncer.computeAndAnnounce();
            List<Pair<String, SegmentSchemas>> announcedAbsoluteSchema = dataSegmentAnnouncer.getAnnouncedAbsoluteSchema();
            List<Pair<String, SegmentSchemas>> announcedDeltaSchema = dataSegmentAnnouncer.getAnnouncedDeltaSchema();
            Assert.assertEquals((long)1L, (long)announcedAbsoluteSchema.size());
            Assert.assertEquals((long)1L, (long)announcedDeltaSchema.size());
            Assert.assertEquals((Object)appenderator.getId(), (Object)announcedAbsoluteSchema.get((int)0).lhs);
            List segmentSchemas = ((SegmentSchemas)announcedAbsoluteSchema.get((int)0).rhs).getSegmentSchemaList();
            Assert.assertEquals((long)1L, (long)segmentSchemas.size());
            SegmentSchemas.SegmentSchema absoluteSchemaId1Row1 = (SegmentSchemas.SegmentSchema)segmentSchemas.get(0);
            Assert.assertEquals((Object)IDENTIFIERS.get(0).asSegmentId().toString(), (Object)absoluteSchemaId1Row1.getSegmentId());
            Assert.assertEquals((long)1L, (long)absoluteSchemaId1Row1.getNumRows().intValue());
            Assert.assertFalse((boolean)absoluteSchemaId1Row1.isDelta());
            Assert.assertEquals(Collections.emptyList(), (Object)absoluteSchemaId1Row1.getUpdatedColumns());
            Assert.assertEquals((Object)Lists.newArrayList((Object[])new String[]{"__time", "dim", "count", "met"}), (Object)absoluteSchemaId1Row1.getNewColumns());
            Assert.assertEquals((Object)ImmutableMap.of((Object)"__time", (Object)ColumnType.LONG, (Object)"count", (Object)ColumnType.LONG, (Object)"dim", (Object)ColumnType.STRING, (Object)"met", (Object)ColumnType.LONG), (Object)absoluteSchemaId1Row1.getColumnTypeMap());
            Assert.assertEquals((Object)appenderator.getId(), (Object)announcedDeltaSchema.get((int)0).lhs);
            segmentSchemas = ((SegmentSchemas)announcedDeltaSchema.get((int)0).rhs).getSegmentSchemaList();
            SegmentSchemas.SegmentSchema deltaSchemaId1Row1 = (SegmentSchemas.SegmentSchema)segmentSchemas.get(0);
            Assert.assertEquals((long)1L, (long)segmentSchemas.size());
            Assert.assertEquals((Object)IDENTIFIERS.get(0).asSegmentId().toString(), (Object)deltaSchemaId1Row1.getSegmentId());
            Assert.assertEquals((long)1L, (long)deltaSchemaId1Row1.getNumRows().intValue());
            Assert.assertFalse((boolean)deltaSchemaId1Row1.isDelta());
            Assert.assertEquals(Collections.emptyList(), (Object)deltaSchemaId1Row1.getUpdatedColumns());
            Assert.assertEquals((Object)Lists.newArrayList((Object[])new String[]{"__time", "dim", "count", "met"}), (Object)deltaSchemaId1Row1.getNewColumns());
            Assert.assertEquals((Object)ImmutableMap.of((Object)"__time", (Object)ColumnType.LONG, (Object)"count", (Object)ColumnType.LONG, (Object)"dim", (Object)ColumnType.STRING, (Object)"met", (Object)ColumnType.LONG), (Object)deltaSchemaId1Row1.getColumnTypeMap());
            dataSegmentAnnouncer.clear();
            commitMetadata.put("x", "2");
            appenderator.add(IDENTIFIERS.get(0), StreamAppenderatorTest.ir("2000", "bar", 2), committerSupplier);
            sinkSchemaAnnouncer.computeAndAnnounce();
            announcedAbsoluteSchema = dataSegmentAnnouncer.getAnnouncedAbsoluteSchema();
            announcedDeltaSchema = dataSegmentAnnouncer.getAnnouncedDeltaSchema();
            Assert.assertEquals((long)1L, (long)announcedAbsoluteSchema.size());
            Assert.assertEquals((long)1L, (long)announcedDeltaSchema.size());
            Assert.assertEquals((Object)appenderator.getId(), (Object)announcedAbsoluteSchema.get((int)0).lhs);
            segmentSchemas = ((SegmentSchemas)announcedAbsoluteSchema.get((int)0).rhs).getSegmentSchemaList();
            Assert.assertEquals((long)1L, (long)segmentSchemas.size());
            SegmentSchemas.SegmentSchema absoluteSchemaId1Row2 = (SegmentSchemas.SegmentSchema)segmentSchemas.get(0);
            Assert.assertEquals((Object)IDENTIFIERS.get(0).asSegmentId().toString(), (Object)absoluteSchemaId1Row2.getSegmentId());
            Assert.assertEquals((long)2L, (long)absoluteSchemaId1Row2.getNumRows().intValue());
            Assert.assertFalse((boolean)absoluteSchemaId1Row2.isDelta());
            Assert.assertEquals(Collections.emptyList(), (Object)absoluteSchemaId1Row2.getUpdatedColumns());
            Assert.assertEquals((Object)Lists.newArrayList((Object[])new String[]{"__time", "dim", "count", "met"}), (Object)absoluteSchemaId1Row2.getNewColumns());
            Assert.assertEquals((Object)ImmutableMap.of((Object)"__time", (Object)ColumnType.LONG, (Object)"count", (Object)ColumnType.LONG, (Object)"dim", (Object)ColumnType.STRING, (Object)"met", (Object)ColumnType.LONG), (Object)absoluteSchemaId1Row2.getColumnTypeMap());
            Assert.assertEquals((Object)appenderator.getId(), (Object)announcedDeltaSchema.get((int)0).lhs);
            segmentSchemas = ((SegmentSchemas)announcedDeltaSchema.get((int)0).rhs).getSegmentSchemaList();
            SegmentSchemas.SegmentSchema deltaSchemaId1Row2 = (SegmentSchemas.SegmentSchema)segmentSchemas.get(0);
            Assert.assertEquals((long)1L, (long)segmentSchemas.size());
            Assert.assertEquals((Object)IDENTIFIERS.get(0).asSegmentId().toString(), (Object)deltaSchemaId1Row2.getSegmentId());
            Assert.assertEquals((long)2L, (long)deltaSchemaId1Row2.getNumRows().intValue());
            Assert.assertTrue((boolean)deltaSchemaId1Row2.isDelta());
            Assert.assertEquals(Collections.emptyList(), (Object)deltaSchemaId1Row2.getUpdatedColumns());
            Assert.assertEquals(Collections.emptyList(), (Object)deltaSchemaId1Row2.getNewColumns());
            Assert.assertEquals(Collections.emptyMap(), (Object)deltaSchemaId1Row2.getColumnTypeMap());
            dataSegmentAnnouncer.clear();
            commitMetadata.put("x", "3");
            appenderator.add(IDENTIFIERS.get(1), StreamAppenderatorTest.ir("2000", "qux", 4), committerSupplier);
            sinkSchemaAnnouncer.computeAndAnnounce();
            announcedAbsoluteSchema = dataSegmentAnnouncer.getAnnouncedAbsoluteSchema();
            announcedDeltaSchema = dataSegmentAnnouncer.getAnnouncedDeltaSchema();
            Assert.assertEquals((long)1L, (long)announcedAbsoluteSchema.size());
            Assert.assertEquals((long)1L, (long)announcedDeltaSchema.size());
            Assert.assertEquals((Object)appenderator.getId(), (Object)announcedAbsoluteSchema.get((int)0).lhs);
            segmentSchemas = ((SegmentSchemas)announcedAbsoluteSchema.get((int)0).rhs).getSegmentSchemaList();
            Assert.assertEquals((long)2L, (long)segmentSchemas.size());
            SegmentSchemas.SegmentSchema absoluteSchemaId2Row1 = segmentSchemas.stream().filter(v -> v.getSegmentId().equals(IDENTIFIERS.get(1).asSegmentId().toString())).findFirst().get();
            Assert.assertEquals((Object)IDENTIFIERS.get(1).asSegmentId().toString(), (Object)absoluteSchemaId2Row1.getSegmentId());
            Assert.assertEquals((long)1L, (long)absoluteSchemaId2Row1.getNumRows().intValue());
            Assert.assertFalse((boolean)absoluteSchemaId2Row1.isDelta());
            Assert.assertEquals(Collections.emptyList(), (Object)absoluteSchemaId2Row1.getUpdatedColumns());
            Assert.assertEquals((Object)Lists.newArrayList((Object[])new String[]{"__time", "dim", "count", "met"}), (Object)absoluteSchemaId2Row1.getNewColumns());
            Assert.assertEquals((Object)ImmutableMap.of((Object)"__time", (Object)ColumnType.LONG, (Object)"count", (Object)ColumnType.LONG, (Object)"dim", (Object)ColumnType.STRING, (Object)"met", (Object)ColumnType.LONG), (Object)absoluteSchemaId2Row1.getColumnTypeMap());
            Assert.assertEquals((Object)appenderator.getId(), (Object)announcedDeltaSchema.get((int)0).lhs);
            segmentSchemas = ((SegmentSchemas)announcedDeltaSchema.get((int)0).rhs).getSegmentSchemaList();
            SegmentSchemas.SegmentSchema deltaSchemaId2Row1 = segmentSchemas.stream().filter(v -> v.getSegmentId().equals(IDENTIFIERS.get(1).asSegmentId().toString())).findFirst().get();
            Assert.assertEquals((long)1L, (long)segmentSchemas.size());
            Assert.assertEquals((Object)IDENTIFIERS.get(1).asSegmentId().toString(), (Object)deltaSchemaId2Row1.getSegmentId());
            Assert.assertEquals((long)1L, (long)deltaSchemaId2Row1.getNumRows().intValue());
            Assert.assertFalse((boolean)deltaSchemaId2Row1.isDelta());
            Assert.assertEquals(Collections.emptyList(), (Object)deltaSchemaId2Row1.getUpdatedColumns());
            Assert.assertEquals((Object)Lists.newArrayList((Object[])new String[]{"__time", "dim", "count", "met"}), (Object)deltaSchemaId2Row1.getNewColumns());
            Assert.assertEquals((Object)ImmutableMap.of((Object)"__time", (Object)ColumnType.LONG, (Object)"count", (Object)ColumnType.LONG, (Object)"dim", (Object)ColumnType.STRING, (Object)"met", (Object)ColumnType.LONG), (Object)deltaSchemaId2Row1.getColumnTypeMap());
        }
    }

    private static SegmentIdWithShardSpec si(String interval, String version, int partitionNum) {
        return new SegmentIdWithShardSpec("foo", Intervals.of((String)interval), version, (ShardSpec)new LinearShardSpec(Integer.valueOf(partitionNum)));
    }

    static InputRow ir(String ts, String dim, Object met) {
        return new MapBasedInputRow(DateTimes.of((String)ts).getMillis(), (List)ImmutableList.of((Object)"dim"), (Map)ImmutableMap.of((Object)"dim", (Object)dim, (Object)"met", (Object)met));
    }

    private static Supplier<Committer> committerSupplierFromConcurrentMap(final ConcurrentMap<String, String> map) {
        return new Supplier<Committer>(){

            public Committer get() {
                ImmutableMap mapCopy = ImmutableMap.copyOf((Map)map);
                return new Committer(){
                    final /* synthetic */ Map val$mapCopy;
                    {
                        this.val$mapCopy = map;
                    }

                    public Object getMetadata() {
                        return this.val$mapCopy;
                    }

                    public void run() {
                    }
                };
            }
        };
    }

    private static <T> List<T> sorted(List<T> xs) {
        ArrayList xsSorted = Lists.newArrayList(xs);
        Collections.sort(xsSorted, (a, b) -> {
            if (a instanceof SegmentIdWithShardSpec && b instanceof SegmentIdWithShardSpec) {
                return ((SegmentIdWithShardSpec)a).compareTo((SegmentIdWithShardSpec)b);
            }
            if (a instanceof DataSegment && b instanceof DataSegment) {
                return ((DataSegment)a).getId().compareTo(((DataSegment)b).getId());
            }
            throw new IllegalStateException("BAD");
        });
        return xsSorted;
    }

    static class TestSchemaAnnouncer
    implements DataSegmentAnnouncer {
        private List<Pair<String, SegmentSchemas>> announcedAbsoluteSchema = new ArrayList<Pair<String, SegmentSchemas>>();
        private List<Pair<String, SegmentSchemas>> announcedDeltaSchema = new ArrayList<Pair<String, SegmentSchemas>>();
        private List<String> unnanouncementEvents = new ArrayList<String>();

        TestSchemaAnnouncer() {
        }

        public void announceSegment(DataSegment segment) {
        }

        public void unannounceSegment(DataSegment segment) {
        }

        public void announceSegments(Iterable<DataSegment> segments) {
        }

        public void unannounceSegments(Iterable<DataSegment> segments) {
        }

        public void announceSegmentSchemas(String taskId, SegmentSchemas segmentSchemas, @Nullable SegmentSchemas segmentSchemasChange) {
            this.announcedAbsoluteSchema.add((Pair<String, SegmentSchemas>)Pair.of((Object)taskId, (Object)segmentSchemas));
            this.announcedDeltaSchema.add((Pair<String, SegmentSchemas>)Pair.of((Object)taskId, (Object)segmentSchemasChange));
        }

        public void removeSegmentSchemasForTask(String taskId) {
            this.unnanouncementEvents.add(taskId);
        }

        public List<Pair<String, SegmentSchemas>> getAnnouncedAbsoluteSchema() {
            return this.announcedAbsoluteSchema;
        }

        public List<Pair<String, SegmentSchemas>> getAnnouncedDeltaSchema() {
            return this.announcedDeltaSchema;
        }

        public List<String> getUnnanouncementEvents() {
            return this.unnanouncementEvents;
        }

        public void clear() {
            this.announcedAbsoluteSchema.clear();
            this.announcedDeltaSchema.clear();
            this.unnanouncementEvents.clear();
        }
    }
}

