/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.common.task;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestFirehose;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.test.TestDataSegmentAnnouncer;
import org.apache.druid.indexing.test.TestDataSegmentKiller;
import org.apache.druid.indexing.test.TestDataSegmentPusher;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.plumber.RejectionPolicyFactory;
import org.apache.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.joda.time.ReadablePeriod;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

public class RealtimeIndexTaskTest
extends InitializedNullHandlingTest {
    private static final Logger log = new Logger(RealtimeIndexTaskTest.class);
    private static final ServiceEmitter EMITTER = new ServiceEmitter("service", "host", (Emitter)new NoopEmitter());
    @Rule
    public final ExpectedException expectedException = ExpectedException.none();
    @Rule
    public final TemporaryFolder tempFolder = new TemporaryFolder();
    private DateTime now;
    private ListeningExecutorService taskExec;
    private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks;

    @Before
    public void setUp() {
        EmittingLogger.registerEmitter((ServiceEmitter)EMITTER);
        EMITTER.start();
        this.taskExec = MoreExecutors.listeningDecorator((ExecutorService)Execs.singleThreaded((String)"realtime-index-task-test-%d"));
        this.now = DateTimes.nowUtc();
    }

    @After
    public void tearDown() {
        this.taskExec.shutdownNow();
    }

    @Test
    public void testMakeTaskId() {
        Assert.assertEquals((Object)"index_realtime_test_0_2015-01-02T00:00:00.000Z_abcdefgh", (Object)RealtimeIndexTask.makeTaskId((String)"test", (int)0, (DateTime)DateTimes.of((String)"2015-01-02"), (String)"abcdefgh"));
    }

    @Test(timeout=60000L)
    public void testDefaultResource() {
        RealtimeIndexTask task = this.makeRealtimeTask(null);
        Assert.assertEquals((Object)task.getId(), (Object)task.getTaskResource().getAvailabilityGroup());
    }

    @Test(timeout=60000L)
    public void testSupportsQueries() {
        RealtimeIndexTask task = this.makeRealtimeTask(null);
        Assert.assertTrue((boolean)task.supportsQueries());
    }

    @Test(timeout=60000L, expected=ExecutionException.class)
    public void testHandoffTimeout() throws Exception {
        TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
        RealtimeIndexTask task = this.makeRealtimeTask(null, TransformSpec.NONE, true, 100L);
        TaskToolbox taskToolbox = this.makeToolbox((Task)task, mdc, this.tempFolder.newFolder());
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task, taskToolbox);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        firehose.addRows((List<Object>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"1")));
        firehose.close();
        while (mdc.getPublished().isEmpty()) {
            Thread.sleep(50L);
        }
        Assert.assertEquals((long)1L, (long)task.getMetrics().processed());
        Assert.assertNotNull((Object)Iterables.getOnlyElement(mdc.getPublished()));
        statusFuture.get();
    }

    @Test(timeout=60000L)
    public void testBasics() throws Exception {
        TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
        RealtimeIndexTask task = this.makeRealtimeTask(null);
        TaskToolbox taskToolbox = this.makeToolbox((Task)task, mdc, this.tempFolder.newFolder());
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task, taskToolbox);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        firehose.addRows((List<Object>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"1"), (Object)ImmutableMap.of((Object)"t", (Object)this.now.minus((ReadablePeriod)new Period((Object)"P1D")).getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)2.0), (Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim2", (Object)"bar", (Object)"met1", (Object)2.0)));
        firehose.close();
        while (mdc.getPublished().isEmpty()) {
            Thread.sleep(50L);
        }
        DataSegment publishedSegment = (DataSegment)Iterables.getOnlyElement(mdc.getPublished());
        Assert.assertEquals((long)2L, (long)task.getMetrics().processed());
        Assert.assertEquals((long)1L, (long)task.getMetrics().thrownAway());
        Assert.assertEquals((long)0L, (long)task.getMetrics().unparseable());
        Assert.assertEquals((long)2L, (long)this.sumMetric((Task)task, null, "rows"));
        Assert.assertEquals((long)3L, (long)this.sumMetric((Task)task, null, "met1"));
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
            Assert.assertEquals((Object)new SegmentDescriptor(publishedSegment.getInterval(), publishedSegment.getVersion(), publishedSegment.getShardSpec().getPartitionNum()), (Object)entry.getKey());
            ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        }
        this.handOffCallbacks.clear();
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
    }

    @Test(timeout=60000L)
    public void testTransformSpec() throws Exception {
        TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
        TransformSpec transformSpec = new TransformSpec((DimFilter)new SelectorDimFilter("dim1", "foo", null), (List)ImmutableList.of((Object)new ExpressionTransform("dim1t", "concat(dim1,dim1)", ExprMacroTable.nil())));
        RealtimeIndexTask task = this.makeRealtimeTask(null, transformSpec, true, 0L);
        TaskToolbox taskToolbox = this.makeToolbox((Task)task, mdc, this.tempFolder.newFolder());
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task, taskToolbox);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        firehose.addRows((List<Object>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"1"), (Object)ImmutableMap.of((Object)"t", (Object)this.now.minus((ReadablePeriod)new Period((Object)"P1D")).getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)2.0), (Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim2", (Object)"bar", (Object)"met1", (Object)2.0)));
        firehose.close();
        while (mdc.getPublished().isEmpty()) {
            Thread.sleep(50L);
        }
        DataSegment publishedSegment = (DataSegment)Iterables.getOnlyElement(mdc.getPublished());
        Assert.assertEquals((long)1L, (long)task.getMetrics().processed());
        Assert.assertEquals((long)2L, (long)task.getMetrics().thrownAway());
        Assert.assertEquals((long)0L, (long)task.getMetrics().unparseable());
        Assert.assertEquals((long)1L, (long)this.sumMetric((Task)task, null, "rows"));
        Assert.assertEquals((long)1L, (long)this.sumMetric((Task)task, (DimFilter)new SelectorDimFilter("dim1t", "foofoo", null), "rows"));
        if (NullHandling.replaceWithDefault()) {
            Assert.assertEquals((long)0L, (long)this.sumMetric((Task)task, (DimFilter)new SelectorDimFilter("dim1t", "barbar", null), "rows"));
        } else {
            Assert.assertNull((Object)this.sumMetric((Task)task, (DimFilter)new SelectorDimFilter("dim1t", "barbar", null), "rows"));
        }
        Assert.assertEquals((long)1L, (long)this.sumMetric((Task)task, null, "met1"));
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
            Assert.assertEquals((Object)new SegmentDescriptor(publishedSegment.getInterval(), publishedSegment.getVersion(), publishedSegment.getShardSpec().getPartitionNum()), (Object)entry.getKey());
            ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        }
        this.handOffCallbacks.clear();
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
    }

    @Test(timeout=60000L)
    public void testReportParseExceptionsOnBadMetric() throws Exception {
        TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
        RealtimeIndexTask task = this.makeRealtimeTask(null, true);
        TaskToolbox taskToolbox = this.makeToolbox((Task)task, mdc, this.tempFolder.newFolder());
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task, taskToolbox);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        firehose.addRows((List<Object>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"1"), (Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"foo"), (Object)ImmutableMap.of((Object)"t", (Object)this.now.minus((ReadablePeriod)new Period((Object)"P1D")).getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"foo"), (Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim2", (Object)"bar", (Object)"met1", (Object)2.0)));
        firehose.close();
        this.expectedException.expect(ExecutionException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(ParseException.class));
        this.expectedException.expectCause(ThrowableMessageMatcher.hasMessage((Matcher)CoreMatchers.containsString((String)"[Unable to parse value[foo] for field[met1]")));
        statusFuture.get();
    }

    @Test(timeout=60000L)
    public void testNoReportParseExceptions() throws Exception {
        TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
        RealtimeIndexTask task = this.makeRealtimeTask(null, false);
        TaskToolbox taskToolbox = this.makeToolbox((Task)task, mdc, this.tempFolder.newFolder());
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task, taskToolbox);
        while (task.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task.getFirehose();
        firehose.addRows(Arrays.asList(ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"1"), null, ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)"foo"), ImmutableMap.of((Object)"dim1", (Object)"foo", (Object)"met1", (Object)2.0, (Object)"__fail__", (Object)"x"), ImmutableMap.of((Object)"t", (Object)this.now.minus((ReadablePeriod)Period.days((int)1)).getMillis(), (Object)"dim1", (Object)"foo", (Object)"met1", (Object)2.0), ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim2", (Object)"bar", (Object)"met1", (Object)2.0)));
        firehose.close();
        while (mdc.getPublished().isEmpty()) {
            Thread.sleep(50L);
        }
        DataSegment publishedSegment = (DataSegment)Iterables.getOnlyElement(mdc.getPublished());
        Assert.assertEquals((long)3L, (long)task.getMetrics().processed());
        Assert.assertEquals((long)1L, (long)task.getMetrics().thrownAway());
        Assert.assertEquals((long)2L, (long)task.getMetrics().unparseable());
        Assert.assertEquals((long)3L, (long)this.sumMetric((Task)task, null, "rows"));
        Assert.assertEquals((long)3L, (long)this.sumMetric((Task)task, null, "met1"));
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
            Assert.assertEquals((Object)new SegmentDescriptor(publishedSegment.getInterval(), publishedSegment.getVersion(), publishedSegment.getShardSpec().getPartitionNum()), (Object)entry.getKey());
            ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        }
        this.handOffCallbacks.clear();
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
    }

    @Test(timeout=60000L)
    public void testRestore() throws Exception {
        File directory = this.tempFolder.newFolder();
        RealtimeIndexTask task1 = this.makeRealtimeTask(null);
        TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
        TaskToolbox taskToolbox = this.makeToolbox((Task)task1, mdc, directory);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task1, taskToolbox);
        while (task1.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task1.getFirehose();
        firehose.addRows((List<Object>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo")));
        task1.stopGracefully(taskToolbox.getConfig());
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
        Assert.assertEquals(new HashSet(), mdc.getPublished());
        mdc = new TestIndexerMetadataStorageCoordinator();
        RealtimeIndexTask task2 = this.makeRealtimeTask(task1.getId());
        TaskToolbox taskToolbox2 = this.makeToolbox((Task)task2, mdc, directory);
        ListenableFuture<TaskStatus> statusFuture2 = this.runTask((Task)task2, taskToolbox2);
        while (task2.getFirehose() == null) {
            Thread.sleep(50L);
        }
        Assert.assertEquals((long)1L, (long)this.sumMetric((Task)task2, null, "rows"));
        TestFirehose firehose2 = (TestFirehose)task2.getFirehose();
        firehose2.addRows((List<Object>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim2", (Object)"bar")));
        firehose2.close();
        while (mdc.getPublished().isEmpty()) {
            Thread.sleep(50L);
        }
        DataSegment publishedSegment = (DataSegment)Iterables.getOnlyElement(mdc.getPublished());
        Assert.assertEquals((long)2L, (long)this.sumMetric((Task)task2, null, "rows"));
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
            Assert.assertEquals((Object)new SegmentDescriptor(publishedSegment.getInterval(), publishedSegment.getVersion(), publishedSegment.getShardSpec().getPartitionNum()), (Object)entry.getKey());
            ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        }
        this.handOffCallbacks.clear();
        TaskStatus taskStatus2 = (TaskStatus)statusFuture2.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus2.getStatusCode());
    }

    @Test(timeout=60000L)
    public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception {
        HeapMemoryTaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
        TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
        File directory = this.tempFolder.newFolder();
        RealtimeIndexTask task1 = this.makeRealtimeTask(null);
        TaskToolbox taskToolbox = this.makeToolbox((Task)task1, (TaskStorage)taskStorage, mdc, directory);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task1, taskToolbox);
        while (task1.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task1.getFirehose();
        firehose.addRows((List<Object>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo")));
        firehose.close();
        while (mdc.getPublished().isEmpty()) {
            Thread.sleep(50L);
        }
        DataSegment publishedSegment = (DataSegment)Iterables.getOnlyElement(mdc.getPublished());
        Assert.assertEquals((long)1L, (long)this.sumMetric((Task)task1, null, "rows"));
        task1.stopGracefully(taskToolbox.getConfig());
        while (!statusFuture.isDone()) {
            Thread.sleep(50L);
        }
        RealtimeIndexTask task2 = this.makeRealtimeTask(task1.getId());
        TaskToolbox taskToolbox2 = this.makeToolbox((Task)task2, (TaskStorage)taskStorage, mdc, directory);
        ListenableFuture<TaskStatus> statusFuture2 = this.runTask((Task)task2, taskToolbox2);
        while (task2.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose2 = (TestFirehose)task2.getFirehose();
        firehose2.close();
        Assert.assertEquals((Object)ImmutableSet.of((Object)publishedSegment), mdc.getPublished());
        while (this.handOffCallbacks.isEmpty()) {
            Thread.sleep(50L);
        }
        for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : this.handOffCallbacks.entrySet()) {
            Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
            Assert.assertEquals((Object)new SegmentDescriptor(publishedSegment.getInterval(), publishedSegment.getVersion(), publishedSegment.getShardSpec().getPartitionNum()), (Object)entry.getKey());
            ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        }
        this.handOffCallbacks.clear();
        TaskStatus taskStatus = (TaskStatus)statusFuture2.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
    }

    @Test(timeout=60000L)
    public void testRestoreCorruptData() throws Exception {
        File directory = this.tempFolder.newFolder();
        RealtimeIndexTask task1 = this.makeRealtimeTask(null);
        TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
        TaskToolbox taskToolbox = this.makeToolbox((Task)task1, mdc, directory);
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task1, taskToolbox);
        while (task1.getFirehose() == null) {
            Thread.sleep(50L);
        }
        TestFirehose firehose = (TestFirehose)task1.getFirehose();
        firehose.addRows((List<Object>)ImmutableList.of((Object)ImmutableMap.of((Object)"t", (Object)this.now.getMillis(), (Object)"dim1", (Object)"foo")));
        task1.stopGracefully(taskToolbox.getConfig());
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
        Assert.assertEquals(new HashSet(), mdc.getPublished());
        File smooshFile = new File(StringUtils.format((String)"%s/persistent/task/%s/work/persist/%s/%s_%s/0/00000.smoosh", (Object[])new Object[]{directory, task1.getId(), task1.getDataSource(), Granularities.DAY.bucketStart(this.now), Granularities.DAY.bucketEnd(this.now)}));
        Files.write(smooshFile.toPath(), StringUtils.toUtf8((String)"oops!"), new OpenOption[0]);
        TestIndexerMetadataStorageCoordinator mdc2 = new TestIndexerMetadataStorageCoordinator();
        RealtimeIndexTask task2 = this.makeRealtimeTask(task1.getId());
        TaskToolbox taskToolbox2 = this.makeToolbox((Task)task2, mdc2, directory);
        ListenableFuture<TaskStatus> statusFuture2 = this.runTask((Task)task2, taskToolbox2);
        boolean caught = false;
        try {
            statusFuture2.get();
        }
        catch (Exception e) {
            caught = true;
        }
        Assert.assertTrue((String)"expected exception", (boolean)caught);
    }

    @Test(timeout=60000L)
    public void testStopBeforeStarting() throws Exception {
        File directory = this.tempFolder.newFolder();
        RealtimeIndexTask task1 = this.makeRealtimeTask(null);
        TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
        TaskToolbox taskToolbox = this.makeToolbox((Task)task1, mdc, directory);
        task1.stopGracefully(taskToolbox.getConfig());
        ListenableFuture<TaskStatus> statusFuture = this.runTask((Task)task1, taskToolbox);
        TaskStatus taskStatus = (TaskStatus)statusFuture.get();
        Assert.assertEquals((Object)TaskState.SUCCESS, (Object)taskStatus.getStatusCode());
    }

    private ListenableFuture<TaskStatus> runTask(final Task task, final TaskToolbox toolbox) {
        return this.taskExec.submit((Callable)new Callable<TaskStatus>(){

            @Override
            public TaskStatus call() throws Exception {
                try {
                    if (task.isReady(toolbox.getTaskActionClient())) {
                        return task.run(toolbox);
                    }
                    throw new ISE("Task is not ready", new Object[0]);
                }
                catch (Exception e) {
                    log.warn((Throwable)e, "Task failed", new Object[0]);
                    throw e;
                }
            }
        });
    }

    private RealtimeIndexTask makeRealtimeTask(String taskId) {
        return this.makeRealtimeTask(taskId, TransformSpec.NONE, true, 0L);
    }

    private RealtimeIndexTask makeRealtimeTask(String taskId, boolean reportParseExceptions) {
        return this.makeRealtimeTask(taskId, TransformSpec.NONE, reportParseExceptions, 0L);
    }

    private RealtimeIndexTask makeRealtimeTask(String taskId, TransformSpec transformSpec, boolean reportParseExceptions, long handoffTimeout) {
        DefaultObjectMapper objectMapper = new DefaultObjectMapper();
        DataSchema dataSchema = new DataSchema("test_ds", (Map)TestHelper.makeJsonMapper().convertValue((Object)new MapInputRowParser((ParseSpec)new TimeAndDimsParseSpec(new TimestampSpec("t", "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas((List)ImmutableList.of((Object)"dim1", (Object)"dim2", (Object)"dim1t"))))), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT), new AggregatorFactory[]{new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("met1", "met1")}, (GranularitySpec)new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), transformSpec, (ObjectMapper)objectMapper);
        RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig((FirehoseFactory)new TestFirehose.TestFirehoseFactory(), null);
        RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(null, Integer.valueOf(1000), null, null, new Period((Object)"P1Y"), new Period((Object)"PT10M"), null, null, (RejectionPolicyFactory)new ServerTimeRejectionPolicyFactory(), null, null, null, null, 0, 0, Boolean.valueOf(reportParseExceptions), Long.valueOf(handoffTimeout), null, null, null);
        return new RealtimeIndexTask(taskId, null, new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig), null){

            protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) {
                return true;
            }
        };
    }

    private TaskToolbox makeToolbox(Task task, IndexerMetadataStorageCoordinator mdc, File directory) {
        return this.makeToolbox(task, (TaskStorage)new HeapMemoryTaskStorage(new TaskStorageConfig(null)), mdc, directory);
    }

    private TaskToolbox makeToolbox(Task task, TaskStorage taskStorage, IndexerMetadataStorageCoordinator mdc, File directory) {
        TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, Integer.valueOf(50000), null, true, null, null, null, false, false, TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), null);
        TaskLockbox taskLockbox = new TaskLockbox(taskStorage, mdc);
        try {
            taskStorage.insert(task, TaskStatus.running((String)task.getId()));
        }
        catch (EntryExistsException entryExistsException) {
            // empty catch block
        }
        taskLockbox.syncFromStorage();
        TaskActionToolbox taskActionToolbox = new TaskActionToolbox(taskLockbox, taskStorage, mdc, EMITTER, (SupervisorManager)EasyMock.createMock(SupervisorManager.class));
        LocalTaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(taskStorage, taskActionToolbox, new TaskAuditLogConfig(false));
        DefaultQueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate((Map)ImmutableMap.of(TimeseriesQuery.class, (Object)new TimeseriesQueryRunnerFactory(new TimeseriesQueryQueryToolChest(), new TimeseriesQueryEngine(), new QueryWatcher(){

            public void registerQueryFuture(Query query, ListenableFuture future) {
            }
        })));
        this.handOffCallbacks = new ConcurrentHashMap<SegmentDescriptor, Pair<Executor, Runnable>>();
        SegmentHandoffNotifierFactory handoffNotifierFactory = new SegmentHandoffNotifierFactory(){

            public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) {
                return new SegmentHandoffNotifier(){

                    public boolean registerSegmentHandoffCallback(SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable) {
                        RealtimeIndexTaskTest.this.handOffCallbacks.put(descriptor, new Pair((Object)exec, (Object)handOffRunnable));
                        return true;
                    }

                    public void start() {
                    }

                    public void close() {
                    }
                };
            }
        };
        TestUtils testUtils = new TestUtils();
        TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(taskConfig, null, (TaskActionClientFactory)taskActionClientFactory, EMITTER, (DataSegmentPusher)new TestDataSegmentPusher(), (DataSegmentKiller)new TestDataSegmentKiller(), null, null, (DataSegmentAnnouncer)new TestDataSegmentAnnouncer(), (DataSegmentServerAnnouncer)EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), handoffNotifierFactory, () -> RealtimeIndexTaskTest.lambda$makeToolbox$0((QueryRunnerFactoryConglomerate)conglomerate), (QueryProcessingPool)DirectQueryProcessingPool.INSTANCE, (JoinableFactory)NoopJoinableFactory.INSTANCE, () -> (MonitorScheduler)EasyMock.createMock(MonitorScheduler.class), new SegmentCacheManagerFactory(testUtils.getTestObjectMapper()), testUtils.getTestObjectMapper(), testUtils.getTestIndexIO(), MapCache.create((long)1024L), new CacheConfig(), new CachePopulatorStats(), testUtils.getIndexMergerV9Factory(), (DruidNodeAnnouncer)EasyMock.createNiceMock(DruidNodeAnnouncer.class), (DruidNode)EasyMock.createNiceMock(DruidNode.class), new LookupNodeService("tier"), new DataNodeService("tier", 1000L, ServerType.INDEXER_EXECUTOR, 0), (TaskReportFileWriter)new NoopTestTaskReportFileWriter(), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, (ChatHandlerProvider)new NoopChatHandlerProvider(), testUtils.getRowIngestionMetersFactory(), (AppenderatorsManager)new TestAppenderatorsManager(), (OverlordClient)new NoopOverlordClient(), null, null, null);
        return toolboxFactory.build(task);
    }

    @Nullable
    public Long sumMetric(Task task, DimFilter filter, String metric) {
        TimeseriesQuery query = Druids.newTimeseriesQueryBuilder().dataSource("test_ds").filters(filter).aggregators((List)ImmutableList.of((Object)new LongSumAggregatorFactory(metric, metric))).granularity(Granularities.ALL).intervals("2000/3000").build();
        List results = task.getQueryRunner((Query)query).run(QueryPlus.wrap((Query)query)).toList();
        if (results.isEmpty()) {
            return 0L;
        }
        return ((TimeseriesResultValue)((Result)results.get(0)).getValue()).getLongMetric(metric);
    }

    private static /* synthetic */ QueryRunnerFactoryConglomerate lambda$makeToolbox$0(QueryRunnerFactoryConglomerate conglomerate) {
        return conglomerate;
    }
}

