/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
import org.apache.druid.indexing.common.task.AppenderatorDriverRealtimeIndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.test.TestDataSegmentAnnouncer;
import org.apache.druid.indexing.test.TestDataSegmentKiller;
import org.apache.druid.indexing.test.TestDataSegmentPusher;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.joda.time.ReadablePeriod;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

public class AppenderatorDriverRealtimeIndexTaskTest
extends InitializedNullHandlingTest {
    private static final Logger log = new Logger(AppenderatorDriverRealtimeIndexTaskTest.class);
    private static final ServiceEmitter EMITTER = new ServiceEmitter("service", "host", (Emitter)new NoopEmitter());
    private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
    private static final String FAIL_DIM = "__fail__";
    @Rule
    public final ExpectedException expectedException = ExpectedException.none();
    @Rule
    public final TemporaryFolder tempFolder = new TemporaryFolder();
    @Rule
    public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
    private DateTime now;
    private ListeningExecutorService taskExec;
    private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks;
    private Collection<DataSegment> publishedSegments;
    private CountDownLatch segmentLatch;
    private CountDownLatch handoffLatch;
    private TaskStorage taskStorage;
    private TaskLockbox taskLockbox;
    private TaskToolboxFactory taskToolboxFactory;
    private File baseDir;
    private File reportsFile;

    @Before
    public void setUp() throws IOException {
        EmittingLogger.registerEmitter((ServiceEmitter)EMITTER);
        EMITTER.start();
        this.taskExec = MoreExecutors.listeningDecorator((ExecutorService)Execs.singleThreaded((String)"realtime-index-task-test-%d"));
        this.now = DateTimes.nowUtc();
        TestDerbyConnector derbyConnector = this.derbyConnectorRule.getConnector();
        derbyConnector.createDataSourceTable();
        derbyConnector.createTaskTables();
        derbyConnector.createSegmentTable();
        derbyConnector.createPendingSegmentsTable();
        this.baseDir = this.tempFolder.newFolder();
        this.reportsFile = File.createTempFile("KafkaIndexTaskTestReports-" + System.currentTimeMillis(), "json");
        this.makeToolboxFactory(this.baseDir);
    }

    @After
    public void tearDown() {
        this.taskExec.shutdownNow();
        this.reportsFile.delete();
    }

    @Test(timeout=60000L)
    public void testDefaultResource() {
        AppenderatorDriverRealtimeIndexTask task = this.makeRealtimeTask(null);
        Assert.assertEquals((Object)task.getId(), (Object)task.getTaskResource().getAvailabilityGroup());
    }

    @Test(timeout=60000L)
    public void testHandoffTimeout() throws Exception {
        this.expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask task = this.makeRealtimeTask(null, TransformSpec.NONE, true, 100L, true, 0, 1);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        firehose.addRows((List<Map<String, Object>>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"1")));
        firehose.close();
        TaskStatus status = (TaskStatus)statusFuture.get();
        Assert.assertTrue((boolean)status.getErrorMsg().contains("java.util.concurrent.TimeoutException: Timeout waiting for task."));
    }

    @Test(timeout=60000L)
    public void testBasics() throws Exception {
        this.expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask task = this.makeRealtimeTask(null);
        Assert.assertTrue((boolean)task.supportsQueries());
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        firehose.addRows((List<Map<String, Object>>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"1"), (Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim2", (Object)"bar", (Object)"met1", (Object)2.0)));
        firehose.close();
        Collection<DataSegment> publishedSegments = this.awaitSegments();
        Assert.assertEquals((long)2L, (long)task.getRowIngestionMeters().getProcessed());
        Assert.assertEquals((long)0L, (long)task.getRowIngestionMeters().getThrownAway());
        Assert.assertEquals((long)0L, (long)task.getRowIngestionMeters().getUnparseable());
        Assert.assertEquals((long)2L, (long)this.sumMetric((Task)task, null, "rows"));
        Assert.assertEquals((long)3L, (long)this.sumMetric((Task)task, null, "met1"));
        this.awaitHandoffs();
        for (DataSegment publishedSegment : publishedSegments) {
            Pair<Executor, Runnable> executorRunnablePair = this.handOffCallbacks.get(new SegmentDescriptor(publishedSegment.getInterval(), publishedSegment.getVersion(), publishedSegment.getShardSpec().getPartitionNum()));
            Assert.assertNotNull((String)(publishedSegment + " missing from handoff callbacks: " + this.handOffCallbacks), executorRunnablePair);
            ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        }
        this.handOffCallbacks.clear();
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
    }

    @Test(timeout=60000L)
    public void testLateData() throws Exception {
        this.expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask task = this.makeRealtimeTask(null);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        firehose.addRows((List<Map<String, Object>>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"1"), (Object)ImmutableMap.of((Object)"t", (Object)this.now.minus((ReadablePeriod)new Period((Object)"P2D")).getMillis(), (Object)"dim2", (Object)"bar", (Object)"met1", (Object)2.0)));
        firehose.close();
        Collection<DataSegment> publishedSegments = this.awaitSegments();
        Assert.assertEquals((long)2L, (long)task.getRowIngestionMeters().getProcessed());
        Assert.assertEquals((long)0L, (long)task.getRowIngestionMeters().getThrownAway());
        Assert.assertEquals((long)0L, (long)task.getRowIngestionMeters().getUnparseable());
        Assert.assertEquals((long)2L, (long)this.sumMetric((Task)task, null, "rows"));
        Assert.assertEquals((long)3L, (long)this.sumMetric((Task)task, null, "met1"));
        this.awaitHandoffs();
        for (DataSegment publishedSegment : publishedSegments) {
            Pair<Executor, Runnable> executorRunnablePair = this.handOffCallbacks.get(new SegmentDescriptor(publishedSegment.getInterval(), publishedSegment.getVersion(), publishedSegment.getShardSpec().getPartitionNum()));
            Assert.assertNotNull((String)(publishedSegment + " missing from handoff callbacks: " + this.handOffCallbacks), executorRunnablePair);
            ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        }
        this.handOffCallbacks.clear();
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
    }

    @Test(timeout=60000L)
    public void testMaxRowsPerSegment() throws Exception {
        this.expectPublishedSegments(2);
        AppenderatorDriverRealtimeIndexTask task = this.makeRealtimeTask(null);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        for (int i = 0; i < 2000; ++i) {
            firehose.addRows((List<Map<String, Object>>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)("foo-" + i), (Object)"met1", (Object)"1")));
        }
        firehose.close();
        Collection<DataSegment> publishedSegments = this.awaitSegments();
        Assert.assertEquals((long)2000L, (long)task.getRowIngestionMeters().getProcessed());
        Assert.assertEquals((long)0L, (long)task.getRowIngestionMeters().getThrownAway());
        Assert.assertEquals((long)0L, (long)task.getRowIngestionMeters().getUnparseable());
        Assert.assertEquals((long)2000L, (long)this.sumMetric((Task)task, null, "rows"));
        Assert.assertEquals((long)2000L, (long)this.sumMetric((Task)task, null, "met1"));
        this.awaitHandoffs();
        for (DataSegment publishedSegment : publishedSegments) {
            Pair<Executor, Runnable> executorRunnablePair = this.handOffCallbacks.get(new SegmentDescriptor(publishedSegment.getInterval(), publishedSegment.getVersion(), publishedSegment.getShardSpec().getPartitionNum()));
            Assert.assertNotNull((String)(publishedSegment + " missing from handoff callbacks: " + this.handOffCallbacks), executorRunnablePair);
            ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        }
        this.handOffCallbacks.clear();
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
    }

    @Test(timeout=60000L)
    public void testMaxTotalRows() throws Exception {
        this.expectPublishedSegments(2);
        AppenderatorDriverRealtimeIndexTask task = this.makeRealtimeTask(null, Integer.MAX_VALUE, 1500L);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        for (int i = 0; i < 2000; ++i) {
            firehose.addRows((List<Map<String, Object>>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)("foo-" + i), (Object)"met1", (Object)"1")));
        }
        firehose.close();
        Collection<DataSegment> publishedSegments = this.awaitSegments();
        Assert.assertEquals((long)2000L, (long)task.getRowIngestionMeters().getProcessed());
        Assert.assertEquals((long)0L, (long)task.getRowIngestionMeters().getThrownAway());
        Assert.assertEquals((long)0L, (long)task.getRowIngestionMeters().getUnparseable());
        Assert.assertEquals((long)2000L, (long)this.sumMetric((Task)task, null, "rows"));
        Assert.assertEquals((long)2000L, (long)this.sumMetric((Task)task, null, "met1"));
        this.awaitHandoffs();
        Assert.assertEquals((long)2L, (long)publishedSegments.size());
        for (DataSegment publishedSegment : publishedSegments) {
            Pair<Executor, Runnable> executorRunnablePair = this.handOffCallbacks.get(new SegmentDescriptor(publishedSegment.getInterval(), publishedSegment.getVersion(), publishedSegment.getShardSpec().getPartitionNum()));
            Assert.assertNotNull((String)(publishedSegment + " missing from handoff callbacks: " + this.handOffCallbacks), executorRunnablePair);
            ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        }
        this.handOffCallbacks.clear();
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
    }

    @Test(timeout=60000L)
    public void testTransformSpec() throws Exception {
        this.expectPublishedSegments(2);
        TransformSpec transformSpec = new TransformSpec((DimFilter)new SelectorDimFilter("dim1", "foo", null), (List)ImmutableList.of((Object)new ExpressionTransform("dim1t", "concat(dim1,dim1)", ExprMacroTable.nil())));
        AppenderatorDriverRealtimeIndexTask task = this.makeRealtimeTask(null, transformSpec, true, 0L, true, 0, 1);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        firehose.addRows((List<Map<String, Object>>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"1"), (Object)ImmutableMap.of((Object)"t", (Object)this.now.minus((ReadablePeriod)new Period((Object)"P1D")).getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)2.0), (Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim2", (Object)"bar", (Object)"met1", (Object)2.0)));
        firehose.close();
        Collection<DataSegment> publishedSegments = this.awaitSegments();
        Assert.assertEquals((long)2L, (long)task.getRowIngestionMeters().getProcessed());
        Assert.assertEquals((long)1L, (long)task.getRowIngestionMeters().getThrownAway());
        Assert.assertEquals((long)0L, (long)task.getRowIngestionMeters().getUnparseable());
        Assert.assertEquals((long)2L, (long)this.sumMetric((Task)task, null, "rows"));
        Assert.assertEquals((long)2L, (long)this.sumMetric((Task)task, (DimFilter)new SelectorDimFilter("dim1t", "foofoo", null), "rows"));
        if (NullHandling.replaceWithDefault()) {
            Assert.assertEquals((long)0L, (long)this.sumMetric((Task)task, (DimFilter)new SelectorDimFilter("dim1t", "barbar", null), "metric1"));
        } else {
            Assert.assertNull((Object)this.sumMetric((Task)task, (DimFilter)new SelectorDimFilter("dim1t", "barbar", null), "metric1"));
        }
        Assert.assertEquals((long)3L, (long)this.sumMetric((Task)task, null, "met1"));
        this.awaitHandoffs();
        for (DataSegment publishedSegment : publishedSegments) {
            Pair<Executor, Runnable> executorRunnablePair = this.handOffCallbacks.get(new SegmentDescriptor(publishedSegment.getInterval(), publishedSegment.getVersion(), publishedSegment.getShardSpec().getPartitionNum()));
            Assert.assertNotNull((String)(publishedSegment + " missing from handoff callbacks: " + this.handOffCallbacks), executorRunnablePair);
            ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        }
        this.handOffCallbacks.clear();
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
    }

    @Test(timeout=60000L)
    public void testReportParseExceptionsOnBadMetric() throws Exception {
        this.expectPublishedSegments(0);
        AppenderatorDriverRealtimeIndexTask task = this.makeRealtimeTask(null, true);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        firehose.addRows((List<Map<String, Object>>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)2000000L, (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"1"), (Object)ImmutableMap.of((Object)"t", (Object)3000000L, (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"foo"), (Object)ImmutableMap.of((Object)"t", (Object)this.now.minus((ReadablePeriod)new Period((Object)"P1D")).getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"foo"), (Object)ImmutableMap.of((Object)"t", (Object)4000000L, (Object)"dim2", (Object)"bar", (Object)"met1", (Object)2.0)));
        firehose.close();
        TaskStatus status = (TaskStatus)statusFuture.get();
        Assert.assertTrue((boolean)status.getErrorMsg().contains("org.apache.druid.java.util.common.RE: Max parse exceptions[0] exceeded"));
        IngestionStatsAndErrorsTaskReportData reportData = this.getTaskReportData();
        List parseExceptionReports = (List)reportData.getUnparseableEvents().get("buildSegments");
        ImmutableList expectedMessages = ImmutableList.of((Object)"Unable to parse value[foo] for field[met1]");
        List actualMessages = parseExceptionReports.stream().map(r -> (String)((List)r.get("details")).get(0)).collect(Collectors.toList());
        Assert.assertEquals((Object)expectedMessages, actualMessages);
        ImmutableList expectedInputs = ImmutableList.of((Object)"{t=3000000, dim1=foo, met1=foo}");
        List actualInputs = parseExceptionReports.stream().map(r -> (String)r.get("input")).collect(Collectors.toList());
        Assert.assertEquals((Object)expectedInputs, actualInputs);
    }

    @Test(timeout=60000L)
    public void testNoReportParseExceptions() throws Exception {
        this.expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask task = this.makeRealtimeTask(null, TransformSpec.NONE, false, 0L, true, null, 1);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        firehose.addRows(Arrays.asList(ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"1"), null, ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"foo"), ImmutableMap.of((Object)"dim1", (Object)"foo", (Object)"met1", (Object)2.0, (Object)FAIL_DIM, (Object)"x"), ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim2", (Object)"bar", (Object)"met1", (Object)2.0)));
        firehose.close();
        Collection<DataSegment> publishedSegments = this.awaitSegments();
        DataSegment publishedSegment = (DataSegment)Iterables.getOnlyElement(publishedSegments);
        Assert.assertEquals((long)2L, (long)task.getRowIngestionMeters().getProcessed());
        Assert.assertEquals((long)1L, (long)task.getRowIngestionMeters().getProcessedWithError());
        Assert.assertEquals((long)0L, (long)task.getRowIngestionMeters().getThrownAway());
        Assert.assertEquals((long)2L, (long)task.getRowIngestionMeters().getUnparseable());
        Assert.assertEquals((long)3L, (long)this.sumMetric((Task)task, null, "rows"));
        Assert.assertEquals((long)3L, (long)this.sumMetric((Task)task, null, "met1"));
        this.awaitHandoffs();
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
            Assert.assertEquals((Object)new SegmentDescriptor(publishedSegment.getInterval(), publishedSegment.getVersion(), publishedSegment.getShardSpec().getPartitionNum()), (Object)entry.getKey());
            ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        }
        this.handOffCallbacks.clear();
        ImmutableMap expectedMetrics = ImmutableMap.of((Object)"buildSegments", (Object)ImmutableMap.of((Object)"processed", (Object)2, (Object)"processedWithError", (Object)1, (Object)"unparseable", (Object)2, (Object)"thrownAway", (Object)0));
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
        IngestionStatsAndErrorsTaskReportData reportData = this.getTaskReportData();
        Assert.assertEquals((Object)expectedMetrics, (Object)reportData.getRowStats());
    }

    @Test(timeout=60000L)
    public void testMultipleParseExceptionsSuccess() throws Exception {
        this.expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask task = this.makeRealtimeTask(null, TransformSpec.NONE, false, 0L, true, 10, 10);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        firehose.addRows(Arrays.asList(ImmutableMap.of((Object)"t", (Object)1521251960729L, (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"1"), null, ImmutableMap.of((Object)"t", (Object)1521251960729L, (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"foo"), ImmutableMap.of((Object)"t", (Object)1521251960729L, (Object)"dim1", (Object)"foo", (Object)"dimLong", (Object)"notnumber", (Object)"dimFloat", (Object)"notnumber", (Object)"met1", (Object)"foo"), ImmutableMap.of((Object)"dim1", (Object)"foo", (Object)"met1", (Object)2.0, (Object)FAIL_DIM, (Object)"x"), ImmutableMap.of((Object)"t", (Object)1521251960729L, (Object)"dim2", (Object)"bar", (Object)"met1", (Object)2.0)));
        firehose.close();
        Collection<DataSegment> publishedSegments = this.awaitSegments();
        DataSegment publishedSegment = (DataSegment)Iterables.getOnlyElement(publishedSegments);
        Assert.assertEquals((long)2L, (long)task.getRowIngestionMeters().getProcessed());
        Assert.assertEquals((long)2L, (long)task.getRowIngestionMeters().getProcessedWithError());
        Assert.assertEquals((long)0L, (long)task.getRowIngestionMeters().getThrownAway());
        Assert.assertEquals((long)2L, (long)task.getRowIngestionMeters().getUnparseable());
        Assert.assertEquals((long)4L, (long)this.sumMetric((Task)task, null, "rows"));
        Assert.assertEquals((long)3L, (long)this.sumMetric((Task)task, null, "met1"));
        this.awaitHandoffs();
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
            Assert.assertEquals((Object)new SegmentDescriptor(publishedSegment.getInterval(), publishedSegment.getVersion(), publishedSegment.getShardSpec().getPartitionNum()), (Object)entry.getKey());
            ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        }
        this.handOffCallbacks.clear();
        ImmutableMap expectedMetrics = ImmutableMap.of((Object)"buildSegments", (Object)ImmutableMap.of((Object)"processed", (Object)2, (Object)"processedWithError", (Object)2, (Object)"unparseable", (Object)2, (Object)"thrownAway", (Object)0));
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
        IngestionStatsAndErrorsTaskReportData reportData = this.getTaskReportData();
        Assert.assertEquals((Object)expectedMetrics, (Object)reportData.getRowStats());
        List parseExceptionReports = (List)reportData.getUnparseableEvents().get("buildSegments");
        List<String> expectedMessages = Arrays.asList("Timestamp[null] is unparseable! Event: {dim1=foo, met1=2.0, __fail__=x}", "could not convert value [notnumber] to long", "Unable to parse value[foo] for field[met1]", "Timestamp[null] is unparseable! Event: null");
        List actualMessages = parseExceptionReports.stream().map(r -> (String)((List)r.get("details")).get(0)).collect(Collectors.toList());
        Assert.assertEquals(expectedMessages, actualMessages);
        List<String> expectedInputs = Arrays.asList("{dim1=foo, met1=2.0, __fail__=x}", "{t=1521251960729, dim1=foo, dimLong=notnumber, dimFloat=notnumber, met1=foo}", "{t=1521251960729, dim1=foo, met1=foo}", null);
        List actualInputs = parseExceptionReports.stream().map(r -> (String)r.get("input")).collect(Collectors.toList());
        Assert.assertEquals(expectedInputs, actualInputs);
        Assert.assertEquals((Object)IngestionState.COMPLETED, (Object)reportData.getIngestionState());
    }

    @Test(timeout=60000L)
    public void testMultipleParseExceptionsFailure() throws Exception {
        this.expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask task = this.makeRealtimeTask(null, TransformSpec.NONE, false, 0L, true, 3, 10);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        firehose.addRows(Arrays.asList(ImmutableMap.of((Object)"t", (Object)1521251960729L, (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"1"), null, ImmutableMap.of((Object)"t", (Object)1521251960729L, (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"foo"), ImmutableMap.of((Object)"t", (Object)1521251960729L, (Object)"dim1", (Object)"foo", (Object)"dimLong", (Object)"notnumber", (Object)"dimFloat", (Object)"notnumber", (Object)"met1", (Object)"foo"), ImmutableMap.of((Object)"dim1", (Object)"foo", (Object)"met1", (Object)2.0, (Object)FAIL_DIM, (Object)"x"), ImmutableMap.of((Object)"t", (Object)1521251960729L, (Object)"dim2", (Object)"bar", (Object)"met1", (Object)2.0)));
        firehose.close();
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.FAILED, (Object)taskStatus.getStatusCode());
        Assert.assertTrue((boolean)taskStatus.getErrorMsg().contains("Max parse exceptions[3] exceeded"));
        IngestionStatsAndErrorsTaskReportData reportData = this.getTaskReportData();
        ImmutableMap expectedMetrics = ImmutableMap.of((Object)"buildSegments", (Object)ImmutableMap.of((Object)"processed", (Object)1, (Object)"processedWithError", (Object)2, (Object)"unparseable", (Object)2, (Object)"thrownAway", (Object)0));
        Assert.assertEquals((Object)expectedMetrics, (Object)reportData.getRowStats());
        List parseExceptionReports = (List)reportData.getUnparseableEvents().get("buildSegments");
        ImmutableList expectedMessages = ImmutableList.of((Object)"Timestamp[null] is unparseable! Event: {dim1=foo, met1=2.0, __fail__=x}", (Object)"could not convert value [notnumber] to long", (Object)"Unable to parse value[foo] for field[met1]", (Object)"Timestamp[null] is unparseable! Event: null");
        List actualMessages = parseExceptionReports.stream().map(r -> (String)((List)r.get("details")).get(0)).collect(Collectors.toList());
        Assert.assertEquals((Object)expectedMessages, actualMessages);
        List<String> expectedInputs = Arrays.asList("{dim1=foo, met1=2.0, __fail__=x}", "{t=1521251960729, dim1=foo, dimLong=notnumber, dimFloat=notnumber, met1=foo}", "{t=1521251960729, dim1=foo, met1=foo}", null);
        List actualInputs = parseExceptionReports.stream().map(r -> (String)r.get("input")).collect(Collectors.toList());
        Assert.assertEquals(expectedInputs, actualInputs);
        Assert.assertEquals((Object)IngestionState.BUILD_SEGMENTS, (Object)reportData.getIngestionState());
    }

    @Test(timeout=60000L)
    public void testRestore() throws Exception {
        this.expectPublishedSegments(0);
        AppenderatorDriverRealtimeIndexTask task1 = this.makeRealtimeTask(null);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task1);
        while (task1.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task1.getFirehose();
        firehose.addRows((List<Map<String, Object>>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo")));
        task1.stopGracefully(this.taskToolboxFactory.build((Task)task1).getConfig());
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
        Assert.assertTrue((boolean)this.publishedSegments.isEmpty());
        this.expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask task2 = this.makeRealtimeTask(task1.getId());
        ListenableFuture<TaskStatus> statusFuture2 = this.runTask((Task)task2);
        while (task2.getFirehose() == null) {
            Thread.sleep(50L);
        }
        Assert.assertEquals((long)1L, (long)this.sumMetric((Task)task2, null, "rows"));
        TestFirehose firehose2 = (TestFirehose)task2.getFirehose();
        firehose2.addRows((List<Map<String, Object>>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim2", (Object)"bar")));
        firehose2.close();
        Collection<DataSegment> publishedSegments = this.awaitSegments();
        DataSegment publishedSegment = (DataSegment)Iterables.getOnlyElement(publishedSegments);
        Assert.assertEquals((long)2L, (long)this.sumMetric((Task)task2, null, "rows"));
        this.awaitHandoffs();
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
            Assert.assertEquals((Object)new SegmentDescriptor(publishedSegment.getInterval(), publishedSegment.getVersion(), publishedSegment.getShardSpec().getPartitionNum()), (Object)entry.getKey());
            ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        }
        this.handOffCallbacks.clear();
        TaskStatus taskStatus2 = (TaskStatus)statusFuture2.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus2.getStatusCode());
    }

    @Test(timeout=60000L)
    public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception {
        AppenderatorDriverRealtimeIndexTask task1 = this.makeRealtimeTask(null);
        this.expectPublishedSegments(1);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task1);
        while (task1.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task1.getFirehose();
        firehose.addRows((List<Map<String, Object>>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo")));
        firehose.close();
        Collection<DataSegment> publishedSegments = this.awaitSegments();
        DataSegment publishedSegment = (DataSegment)Iterables.getOnlyElement(publishedSegments);
        Assert.assertEquals((long)1L, (long)this.sumMetric((Task)task1, null, "rows"));
        task1.stopGracefully(this.taskToolboxFactory.build((Task)task1).getConfig());
        while (!statusFuture.isDone()) {
            Thread.sleep(50L);
        }
        this.expectPublishedSegments(1);
        AppenderatorDriverRealtimeIndexTask task2 = this.makeRealtimeTask(task1.getId());
        ListenableFuture<TaskStatus> statusFuture2 = this.runTask((Task)task2);
        while (task2.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose2 = (TestFirehose)task2.getFirehose();
        firehose2.close();
        this.awaitHandoffs();
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
            Assert.assertEquals((Object)new SegmentDescriptor(publishedSegment.getInterval(), publishedSegment.getVersion(), publishedSegment.getShardSpec().getPartitionNum()), (Object)entry.getKey());
            ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        }
        this.handOffCallbacks.clear();
        TaskStatus taskStatus = (TaskStatus)statusFuture2.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
    }

    @Test(timeout=60000L)
    public void testRestoreCorruptData() throws Exception {
        AppenderatorDriverRealtimeIndexTask task1 = this.makeRealtimeTask(null);
        this.expectPublishedSegments(0);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task1);
        while (task1.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task1.getFirehose();
        firehose.addRows((List<Map<String, Object>>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo")));
        task1.stopGracefully(this.taskToolboxFactory.build((Task)task1).getConfig());
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
        Assert.assertTrue((boolean)this.publishedSegments.isEmpty());
        Optional<File> optional = FileUtils.listFiles((File)this.baseDir, null, (boolean)true).stream().filter(f -> f.getName().equals("00000.smoosh")).findFirst();
        Assert.assertTrue((String)"Could not find smoosh file", (boolean)optional.isPresent());
        File smooshFile = optional.get();
        Files.write(smooshFile.toPath(), StringUtils.toUtf8((String)"oops!"), new OpenOption[0]);
        this.expectPublishedSegments(0);
        AppenderatorDriverRealtimeIndexTask task2 = this.makeRealtimeTask(task1.getId());
        ListenableFuture<TaskStatus> statusFuture2 = this.runTask((Task)task2);
        TaskStatus status = (TaskStatus)statusFuture2.get();
        ImmutableMap expectedMetrics = ImmutableMap.of((Object)"buildSegments", (Object)ImmutableMap.of((Object)"processedWithError", (Object)0, (Object)"processed", (Object)0, (Object)"unparseable", (Object)0, (Object)"thrownAway", (Object)0));
        IngestionStatsAndErrorsTaskReportData reportData = this.getTaskReportData();
        Assert.assertEquals((Object)expectedMetrics, (Object)reportData.getRowStats());
        Pattern errorPattern = Pattern.compile("(?s)java\\.lang\\.IllegalArgumentException.*\n\tat (java\\.base/)?java\\.nio\\.Buffer\\..*");
        Assert.assertTrue((boolean)errorPattern.matcher(status.getErrorMsg()).matches());
    }

    @Test(timeout=60000L)
    public void testStopBeforeStarting() throws Exception {
        this.expectPublishedSegments(0);
        AppenderatorDriverRealtimeIndexTask task1 = this.makeRealtimeTask(null);
        task1.stopGracefully(this.taskToolboxFactory.build((Task)task1).getConfig());
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task1);
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
    }

    private ListenableFuture<TaskStatus> runTask(Task task) {
        try {
            this.taskStorage.insert(task, TaskStatus.running((String)task.getId()));
        }
        catch (EntryExistsException entryExistsException) {
            // empty catch block
        }
        this.taskLockbox.syncFromStorage();
        TaskToolbox toolbox = this.taskToolboxFactory.build(task);
        return this.taskExec.submit(() -> {
            try {
                if (task.isReady(toolbox.getTaskActionClient())) {
                    return task.run(toolbox);
                }
                throw new ISE("Task is not ready", new Object[0]);
            }
            catch (Exception e) {
                log.warn((Throwable)e, "Task failed", new Object[0]);
                throw e;
            }
        });
    }

    private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(String taskId) {
        return this.makeRealtimeTask(taskId, TransformSpec.NONE, true, 0L, true, 0, 1);
    }

    private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(String taskId, Integer maxRowsPerSegment, Long maxTotalRows) {
        return this.makeRealtimeTask(taskId, TransformSpec.NONE, true, 0L, true, 0, 1, maxRowsPerSegment, maxTotalRows);
    }

    private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(String taskId, boolean reportParseExceptions) {
        return this.makeRealtimeTask(taskId, TransformSpec.NONE, reportParseExceptions, 0L, true, null, 1);
    }

    private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(String taskId, TransformSpec transformSpec, boolean reportParseExceptions, long handoffTimeout, Boolean logParseExceptions, Integer maxParseExceptions, Integer maxSavedParseExceptions) {
        return this.makeRealtimeTask(taskId, transformSpec, reportParseExceptions, handoffTimeout, logParseExceptions, maxParseExceptions, maxSavedParseExceptions, 1000, null);
    }

    private AppenderatorDriverRealtimeIndexTask makeRealtimeTask(String taskId, TransformSpec transformSpec, boolean reportParseExceptions, long handoffTimeout, Boolean logParseExceptions, Integer maxParseExceptions, Integer maxSavedParseExceptions, Integer maxRowsPerSegment, Long maxTotalRows) {
        DataSchema dataSchema = new DataSchema("test_ds", (Map)TestHelper.makeJsonMapper().convertValue((Object)new MapInputRowParser((ParseSpec)new TimeAndDimsParseSpec(new TimestampSpec("t", "auto", null), new DimensionsSpec((List)ImmutableList.of((Object)new StringDimensionSchema("dim1"), (Object)new StringDimensionSchema("dim2"), (Object)new StringDimensionSchema("dim1t"), (Object)new LongDimensionSchema("dimLong"), (Object)new FloatDimensionSchema("dimFloat"))))), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT), new AggregatorFactory[]{new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("met1", "met1")}, (GranularitySpec)new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), transformSpec, OBJECT_MAPPER);
        RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig((FirehoseFactory)new TestFirehoseFactory(), null);
        RealtimeAppenderatorTuningConfig tuningConfig = new RealtimeAppenderatorTuningConfig(null, Integer.valueOf(1000), null, null, maxRowsPerSegment, maxTotalRows, null, null, null, null, null, null, Boolean.valueOf(reportParseExceptions), Long.valueOf(handoffTimeout), null, null, logParseExceptions, maxParseExceptions, maxSavedParseExceptions);
        return new AppenderatorDriverRealtimeIndexTask(taskId, null, new RealtimeAppenderatorIngestionSpec(dataSchema, realtimeIOConfig, tuningConfig), null){

            protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) {
                return true;
            }
        };
    }

    private void expectPublishedSegments(int count) {
        this.segmentLatch = new CountDownLatch(count);
        this.handoffLatch = new CountDownLatch(count);
    }

    private Collection<DataSegment> awaitSegments() throws InterruptedException {
        Assert.assertTrue((String)"Timed out waiting for segments to be published", (boolean)this.segmentLatch.await(1L, TimeUnit.MINUTES));
        return this.publishedSegments;
    }

    private void awaitHandoffs() throws InterruptedException {
        Assert.assertTrue((String)"Timed out waiting for segments to be handed off", (boolean)this.handoffLatch.await(1L, TimeUnit.MINUTES));
    }

    private void makeToolboxFactory(File directory) {
        this.taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
        this.publishedSegments = new CopyOnWriteArrayList<DataSegment>();
        DefaultObjectMapper mapper = new DefaultObjectMapper();
        mapper.registerSubtypes(new Class[]{LinearShardSpec.class});
        mapper.registerSubtypes(new Class[]{NumberedShardSpec.class});
        IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator((ObjectMapper)mapper, (MetadataStorageTablesConfig)this.derbyConnectorRule.metadataTablesConfigSupplier().get(), (SQLMetadataConnector)this.derbyConnectorRule.getConnector()){

            public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments) throws IOException {
                Set result = super.announceHistoricalSegments(segments);
                Assert.assertFalse((String)"Segment latch not initialized, did you forget to call expectPublishSegments?", (AppenderatorDriverRealtimeIndexTaskTest.this.segmentLatch == null ? 1 : 0) != 0);
                AppenderatorDriverRealtimeIndexTaskTest.this.publishedSegments.addAll(result);
                segments.forEach(s -> AppenderatorDriverRealtimeIndexTaskTest.this.segmentLatch.countDown());
                return result;
            }

            public SegmentPublishResult announceHistoricalSegments(Set<DataSegment> segments, Set<DataSegment> segmentsToDrop, DataSourceMetadata startMetadata, DataSourceMetadata endMetadata) throws IOException {
                SegmentPublishResult result = super.announceHistoricalSegments(segments, segmentsToDrop, startMetadata, endMetadata);
                Assert.assertFalse((String)"Segment latch not initialized, did you forget to call expectPublishSegments?", (AppenderatorDriverRealtimeIndexTaskTest.this.segmentLatch == null ? 1 : 0) != 0);
                AppenderatorDriverRealtimeIndexTaskTest.this.publishedSegments.addAll(result.getSegments());
                result.getSegments().forEach(s -> AppenderatorDriverRealtimeIndexTaskTest.this.segmentLatch.countDown());
                return result;
            }
        };
        this.taskLockbox = new TaskLockbox(this.taskStorage, (IndexerMetadataStorageCoordinator)mdc);
        TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, Integer.valueOf(50000), null, true, null, null, null, false, false, TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), null);
        TaskActionToolbox taskActionToolbox = new TaskActionToolbox(this.taskLockbox, this.taskStorage, (IndexerMetadataStorageCoordinator)mdc, EMITTER, (SupervisorManager)EasyMock.createMock(SupervisorManager.class));
        LocalTaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(this.taskStorage, taskActionToolbox, new TaskAuditLogConfig(false));
        DefaultQueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate((Map)ImmutableMap.of(TimeseriesQuery.class, (Object)new TimeseriesQueryRunnerFactory(new TimeseriesQueryQueryToolChest(), new TimeseriesQueryEngine(), (query, future) -> {})));
        this.handOffCallbacks = new ConcurrentHashMap<SegmentDescriptor, Pair<Executor, Runnable>>();
        SegmentHandoffNotifierFactory handoffNotifierFactory = dataSource -> new SegmentHandoffNotifier(){

            public boolean registerSegmentHandoffCallback(SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable) {
                AppenderatorDriverRealtimeIndexTaskTest.this.handOffCallbacks.put(descriptor, new Pair((Object)exec, (Object)handOffRunnable));
                AppenderatorDriverRealtimeIndexTaskTest.this.handoffLatch.countDown();
                return true;
            }

            public void start() {
            }

            public void close() {
            }
        };
        TestUtils testUtils = new TestUtils();
        this.taskToolboxFactory = new TaskToolboxFactory(taskConfig, new DruidNode("druid/middlemanager", "localhost", false, Integer.valueOf(8091), null, true, false), (TaskActionClientFactory)taskActionClientFactory, EMITTER, (DataSegmentPusher)new TestDataSegmentPusher(), (DataSegmentKiller)new TestDataSegmentKiller(), null, null, (DataSegmentAnnouncer)new TestDataSegmentAnnouncer(), (DataSegmentServerAnnouncer)EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), handoffNotifierFactory, () -> AppenderatorDriverRealtimeIndexTaskTest.lambda$makeToolboxFactory$10((QueryRunnerFactoryConglomerate)conglomerate), (QueryProcessingPool)DirectQueryProcessingPool.INSTANCE, (JoinableFactory)NoopJoinableFactory.INSTANCE, () -> (MonitorScheduler)EasyMock.createMock(MonitorScheduler.class), new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create((long)1024L), new CacheConfig(), new CachePopulatorStats(), testUtils.getIndexMergerV9Factory(), (DruidNodeAnnouncer)EasyMock.createNiceMock(DruidNodeAnnouncer.class), (DruidNode)EasyMock.createNiceMock(DruidNode.class), new LookupNodeService("tier"), new DataNodeService("tier", 1000L, ServerType.INDEXER_EXECUTOR, 0), (TaskReportFileWriter)new SingleFileTaskReportFileWriter(this.reportsFile), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, (ChatHandlerProvider)new NoopChatHandlerProvider(), testUtils.getRowIngestionMetersFactory(), (AppenderatorsManager)new TestAppenderatorsManager(), (OverlordClient)new NoopOverlordClient(), null, null, null);
    }

    @Nullable
    public Long sumMetric(Task task, DimFilter filter, String metric) {
        TimeseriesQuery query = Druids.newTimeseriesQueryBuilder().dataSource("test_ds").filters(filter).aggregators((List)ImmutableList.of((Object)new LongSumAggregatorFactory(metric, metric))).granularity(Granularities.ALL).intervals("2000/3000").build();
        List results = task.getQueryRunner((Query)query).run(QueryPlus.wrap((Query)query)).toList();
        if (results.isEmpty()) {
            return 0L;
        }
        return ((TimeseriesResultValue)((Result)results.get(0)).getValue()).getLongMetric(metric);
    }

    private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws IOException {
        Map taskReports = (Map)OBJECT_MAPPER.readValue(this.reportsFile, (TypeReference)new TypeReference<Map<String, TaskReport>>(){});
        return IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports((Map)taskReports);
    }

    private static /* synthetic */ QueryRunnerFactoryConglomerate lambda$makeToolboxFactory$10(QueryRunnerFactoryConglomerate conglomerate) {
        return conglomerate;
    }

    private static class TestFirehoseFactory
    implements FirehoseFactory<InputRowParser> {
        public Firehose connect(InputRowParser parser, File temporaryDirectory) throws ParseException {
            return new TestFirehose((InputRowParser<Map<String, Object>>)parser);
        }
    }

    private static class TestFirehose
    implements Firehose {
        private final InputRowParser<Map<String, Object>> parser;
        private final Deque<Optional<Map<String, Object>>> queue = new ArrayDeque<Optional<Map<String, Object>>>();
        private boolean closed = false;

        public TestFirehose(InputRowParser<Map<String, Object>> parser) {
            this.parser = parser;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void addRows(List<Map<String, Object>> rows) {
            TestFirehose testFirehose = this;
            synchronized (testFirehose) {
                rows.stream().map(Optional::ofNullable).forEach(this.queue::add);
                this.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean hasMore() {
            try {
                TestFirehose testFirehose = this;
                synchronized (testFirehose) {
                    while (this.queue.isEmpty() && !this.closed) {
                        this.wait();
                    }
                    return !this.queue.isEmpty();
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public InputRow nextRow() {
            TestFirehose testFirehose = this;
            synchronized (testFirehose) {
                InputRow row = (InputRow)this.parser.parseBatch(this.queue.removeFirst().orElse(null)).get(0);
                if (row != null && row.getRaw(AppenderatorDriverRealtimeIndexTaskTest.FAIL_DIM) != null) {
                    throw new ParseException(null, AppenderatorDriverRealtimeIndexTaskTest.FAIL_DIM, new Object[0]);
                }
                return row;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() {
            TestFirehose testFirehose = this;
            synchronized (testFirehose) {
                this.closed = true;
                this.notifyAll();
            }
        }
    }
}

