/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.NoopInputFormat;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.AbstractFixedIntervalTask;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.MetadataTaskStorage;
import org.apache.druid.indexing.overlord.RealtimeishTask;
import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.MetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9Factory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentKiller;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentTest;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Hours;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.ReadablePeriod;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TaskLifecycleTest
extends InitializedNullHandlingTest {
    private static final ObjectMapper MAPPER;
    private static final IndexMergerV9Factory INDEX_MERGER_V9_FACTORY;
    private static final IndexIO INDEX_IO;
    private static final TestUtils TEST_UTILS;
    private static final String HEAP_TASK_STORAGE = "HeapMemoryTaskStorage";
    private static final String METADATA_TASK_STORAGE = "MetadataTaskStorage";
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final Ordering<DataSegment> BY_INTERVAL_ORDERING;
    private static DateTime now;
    private static final Iterable<InputRow> REALTIME_IDX_TASK_INPUT_ROWS;
    private static final Iterable<InputRow> IDX_TASK_INPUT_ROWS;
    @Rule
    public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
    private final String taskStorageType;
    private ObjectMapper mapper;
    private TaskStorageQueryAdapter tsqa = null;
    private TaskStorage taskStorage = null;
    private TaskLockbox taskLockbox = null;
    private TaskQueue taskQueue = null;
    private TaskRunner taskRunner = null;
    private TestIndexerMetadataStorageCoordinator mdc = null;
    private TaskActionClientFactory tac = null;
    private TaskToolboxFactory tb = null;
    private IndexSpec indexSpec;
    private QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
    private MonitorScheduler monitorScheduler;
    private ServiceEmitter emitter;
    private TaskLockConfig lockConfig;
    private TaskQueueConfig tqc;
    private TaskConfig taskConfig;
    private DataSegmentPusher dataSegmentPusher;
    private DruidNode druidNode = new DruidNode("dummy", "dummy", false, Integer.valueOf(10000), null, true, false);
    private TaskLocation taskLocation = TaskLocation.create((String)this.druidNode.getHost(), (int)this.druidNode.getPlaintextPort(), (int)this.druidNode.getTlsPort());
    private int pushedSegments;
    private int announcedSinks;
    private SegmentHandoffNotifierFactory handoffNotifierFactory;
    private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks;
    private static CountDownLatch publishCountDown;

    @Parameterized.Parameters(name="taskStorageType={0}")
    public static Collection<String[]> constructFeed() {
        return Arrays.asList({HEAP_TASK_STORAGE}, {METADATA_TASK_STORAGE});
    }

    public TaskLifecycleTest(String taskStorageType) {
        this.taskStorageType = taskStorageType;
    }

    private static ServiceEmitter newMockEmitter() {
        return new NoopServiceEmitter();
    }

    private static InputRow ir(String dt, String dim1, String dim2, float met) {
        return new MapBasedInputRow(DateTimes.of((String)dt).getMillis(), (List)ImmutableList.of((Object)"dim1", (Object)"dim2"), (Map)ImmutableMap.of((Object)"dim1", (Object)dim1, (Object)"dim2", (Object)dim2, (Object)"met", (Object)Float.valueOf(met)));
    }

    @Before
    public void setUp() throws Exception {
        this.queryRunnerFactoryConglomerate = (QueryRunnerFactoryConglomerate)EasyMock.createStrictMock(QueryRunnerFactoryConglomerate.class);
        this.monitorScheduler = (MonitorScheduler)EasyMock.createStrictMock(MonitorScheduler.class);
        this.announcedSinks = 0;
        this.pushedSegments = 0;
        this.indexSpec = new IndexSpec();
        this.emitter = TaskLifecycleTest.newMockEmitter();
        EmittingLogger.registerEmitter((ServiceEmitter)this.emitter);
        this.mapper = TEST_UTILS.getTestObjectMapper();
        this.handOffCallbacks = new ConcurrentHashMap<SegmentDescriptor, Pair<Executor, Runnable>>();
        this.taskStorage = this.setUpTaskStorage();
        this.handoffNotifierFactory = this.setUpSegmentHandOffNotifierFactory();
        this.dataSegmentPusher = this.setUpDataSegmentPusher();
        this.mdc = this.setUpMetadataStorageCoordinator();
        this.tb = this.setUpTaskToolboxFactory(this.dataSegmentPusher, this.handoffNotifierFactory, this.mdc);
        this.taskRunner = this.setUpThreadPoolTaskRunner(this.tb);
        this.taskQueue = this.setUpTaskQueue(this.taskStorage, this.taskRunner);
    }

    private TaskStorage setUpTaskStorage() {
        HeapMemoryTaskStorage taskStorage;
        Preconditions.checkNotNull((Object)this.mapper);
        Preconditions.checkNotNull((Object)this.derbyConnectorRule);
        switch (this.taskStorageType) {
            case "HeapMemoryTaskStorage": {
                taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null){});
                break;
            }
            case "MetadataTaskStorage": {
                TestDerbyConnector testDerbyConnector = this.derbyConnectorRule.getConnector();
                this.mapper.registerSubtypes(new NamedType[]{new NamedType(MockFirehoseFactory.class, "mockFirehoseFactory"), new NamedType(MockInputSource.class, "mockInputSource"), new NamedType(NoopInputFormat.class, "noopInputFormat")});
                testDerbyConnector.createTaskTables();
                testDerbyConnector.createSegmentTable();
                taskStorage = new MetadataTaskStorage((MetadataStorageConnector)testDerbyConnector, new TaskStorageConfig(null), (MetadataStorageActionHandlerFactory)new DerbyMetadataStorageActionHandlerFactory((SQLMetadataConnector)testDerbyConnector, (MetadataStorageTablesConfig)this.derbyConnectorRule.metadataTablesConfigSupplier().get(), this.mapper));
                break;
            }
            default: {
                throw new RE("Unknown task storage type [%s]", new Object[]{this.taskStorageType});
            }
        }
        this.tsqa = new TaskStorageQueryAdapter((TaskStorage)taskStorage, this.taskLockbox);
        return taskStorage;
    }

    private SegmentHandoffNotifierFactory setUpSegmentHandOffNotifierFactory() {
        Preconditions.checkNotNull(this.handOffCallbacks);
        return new SegmentHandoffNotifierFactory(){

            public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) {
                return new SegmentHandoffNotifier(){

                    public boolean registerSegmentHandoffCallback(SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable) {
                        TaskLifecycleTest.this.handOffCallbacks.put(descriptor, new Pair((Object)exec, (Object)handOffRunnable));
                        return true;
                    }

                    public void start() {
                    }

                    public void close() {
                    }
                };
            }
        };
    }

    private DataSegmentPusher setUpDataSegmentPusher() {
        return new DataSegmentPusher(){

            public String getPathForHadoop() {
                throw new UnsupportedOperationException();
            }

            @Deprecated
            public String getPathForHadoop(String dataSource) {
                return this.getPathForHadoop();
            }

            public DataSegment push(File file, DataSegment segment, boolean useUniquePath) {
                TaskLifecycleTest.this.pushedSegments++;
                return segment;
            }

            public Map<String, Object> makeLoadSpec(URI uri) {
                throw new UnsupportedOperationException();
            }
        };
    }

    private TestIndexerMetadataStorageCoordinator setUpMetadataStorageCoordinator() {
        return new TestIndexerMetadataStorageCoordinator(){

            @Override
            public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments) {
                Set<DataSegment> retVal = super.announceHistoricalSegments(segments);
                if (publishCountDown != null) {
                    publishCountDown.countDown();
                }
                return retVal;
            }
        };
    }

    private TaskToolboxFactory setUpTaskToolboxFactory(DataSegmentPusher dataSegmentPusher, SegmentHandoffNotifierFactory handoffNotifierFactory, TestIndexerMetadataStorageCoordinator mdc) throws IOException {
        return this.setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc, new TestAppenderatorsManager());
    }

    private TaskToolboxFactory setUpTaskToolboxFactory(DataSegmentPusher dataSegmentPusher, SegmentHandoffNotifierFactory handoffNotifierFactory, TestIndexerMetadataStorageCoordinator mdc, AppenderatorsManager appenderatorsManager) throws IOException {
        Preconditions.checkNotNull((Object)this.queryRunnerFactoryConglomerate);
        Preconditions.checkNotNull((Object)this.monitorScheduler);
        Preconditions.checkNotNull((Object)this.taskStorage);
        Preconditions.checkNotNull((Object)this.emitter);
        this.taskLockbox = new TaskLockbox(this.taskStorage, (IndexerMetadataStorageCoordinator)mdc);
        this.tac = new LocalTaskActionClientFactory(this.taskStorage, new TaskActionToolbox(this.taskLockbox, this.taskStorage, (IndexerMetadataStorageCoordinator)mdc, this.emitter, (SupervisorManager)EasyMock.createMock(SupervisorManager.class)), new TaskAuditLogConfig(true));
        File tmpDir = this.temporaryFolder.newFolder();
        this.taskConfig = new TaskConfig(tmpDir.toString(), null, null, Integer.valueOf(50000), null, false, null, null, null, false, false, TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), null);
        return new TaskToolboxFactory(this.taskConfig, new DruidNode("druid/middlemanager", "localhost", false, Integer.valueOf(8091), null, true, false), this.tac, this.emitter, dataSegmentPusher, (DataSegmentKiller)new LocalDataSegmentKiller(new LocalDataSegmentPusherConfig()), new DataSegmentMover(){

            public DataSegment move(DataSegment dataSegment, Map<String, Object> targetLoadSpec) {
                return dataSegment;
            }
        }, new DataSegmentArchiver(){

            public DataSegment archive(DataSegment segment) {
                return segment;
            }

            public DataSegment restore(DataSegment segment) {
                return segment;
            }
        }, new DataSegmentAnnouncer(){

            public void announceSegment(DataSegment segment) {
                TaskLifecycleTest.this.announcedSinks++;
            }

            public void unannounceSegment(DataSegment segment) {
            }

            public void announceSegments(Iterable<DataSegment> segments) {
            }

            public void unannounceSegments(Iterable<DataSegment> segments) {
            }
        }, (DataSegmentServerAnnouncer)EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), handoffNotifierFactory, () -> this.queryRunnerFactoryConglomerate, (QueryProcessingPool)DirectQueryProcessingPool.INSTANCE, (JoinableFactory)NoopJoinableFactory.INSTANCE, () -> this.monitorScheduler, new SegmentCacheManagerFactory((ObjectMapper)new DefaultObjectMapper()), MAPPER, INDEX_IO, MapCache.create((long)0L), FireDepartmentTest.NO_CACHE_CONFIG, new CachePopulatorStats(), INDEX_MERGER_V9_FACTORY, (DruidNodeAnnouncer)EasyMock.createNiceMock(DruidNodeAnnouncer.class), (DruidNode)EasyMock.createNiceMock(DruidNode.class), new LookupNodeService("tier"), new DataNodeService("tier", 1000L, ServerType.INDEXER_EXECUTOR, 0), (TaskReportFileWriter)new NoopTestTaskReportFileWriter(), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, (ChatHandlerProvider)new NoopChatHandlerProvider(), TEST_UTILS.getRowIngestionMetersFactory(), appenderatorsManager, (OverlordClient)new NoopOverlordClient(), null, null, null);
    }

    private TaskRunner setUpThreadPoolTaskRunner(TaskToolboxFactory tb) {
        Preconditions.checkNotNull((Object)this.taskConfig);
        Preconditions.checkNotNull((Object)this.emitter);
        return new SingleTaskBackgroundRunner(tb, this.taskConfig, this.emitter, this.druidNode, new ServerConfig());
    }

    private TaskQueue setUpTaskQueue(TaskStorage ts, TaskRunner tr) throws Exception {
        Preconditions.checkNotNull((Object)this.taskLockbox);
        Preconditions.checkNotNull((Object)this.tac);
        Preconditions.checkNotNull((Object)this.emitter);
        this.lockConfig = new TaskLockConfig();
        this.tqc = (TaskQueueConfig)this.mapper.readValue("{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\", \"storageSyncRate\":\"PT0.5S\"}", TaskQueueConfig.class);
        return new TaskQueue(this.lockConfig, this.tqc, new DefaultTaskConfig(), ts, tr, this.tac, this.taskLockbox, this.emitter);
    }

    @After
    public void tearDown() {
        if (this.taskQueue.isActive()) {
            this.taskQueue.stop();
        }
    }

    @Test
    public void testIndexTask() throws Exception {
        IndexTask indexTask = new IndexTask(null, null, new IndexTask.IndexIngestionSpec(new DataSchema("foo", new TimestampSpec(null, null, null), DimensionsSpec.EMPTY, new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, (GranularitySpec)new UniformGranularitySpec(Granularities.DAY, null, (List)ImmutableList.of((Object)Intervals.of((String)"2010-01-01/P2D"))), null), new IndexTask.IndexIOConfig(null, (InputSource)new MockInputSource(), (InputFormat)new NoopInputFormat(), Boolean.valueOf(false), Boolean.valueOf(false)), new IndexTask.IndexTuningConfig(null, Integer.valueOf(10000), null, Integer.valueOf(10), null, null, null, null, null, null, null, this.indexSpec, null, Integer.valueOf(3), Boolean.valueOf(false), null, null, null, null, null, null, null, null, null)), null);
        Optional preRunTaskStatus = this.tsqa.getStatus(indexTask.getId());
        Assert.assertTrue((String)"pre run task status not present", (!preRunTaskStatus.isPresent() ? 1 : 0) != 0);
        TaskStatus mergedStatus = this.runTask((Task)indexTask);
        TaskStatus status = (TaskStatus)this.taskStorage.getStatus(indexTask.getId()).get();
        List publishedSegments = BY_INTERVAL_ORDERING.sortedCopy(this.mdc.getPublished());
        List loggedSegments = BY_INTERVAL_ORDERING.sortedCopy((Iterable)this.tsqa.getInsertedSegments(indexTask.getId()));
        Assert.assertEquals((String)"statusCode", (Object)TaskState.SUCCESS, (Object)status.getStatusCode());
        Assert.assertEquals((Object)this.taskLocation, (Object)status.getLocation());
        Assert.assertEquals((String)"merged statusCode", (Object)TaskState.SUCCESS, (Object)mergedStatus.getStatusCode());
        Assert.assertEquals((String)"segments logged vs published", (Object)loggedSegments, (Object)publishedSegments);
        Assert.assertEquals((String)"num segments published", (long)2L, (long)this.mdc.getPublished().size());
        Assert.assertEquals((String)"num segments nuked", (long)0L, (long)this.mdc.getNuked().size());
        Assert.assertEquals((String)"segment1 datasource", (Object)"foo", (Object)((DataSegment)publishedSegments.get(0)).getDataSource());
        Assert.assertEquals((String)"segment1 interval", (Object)Intervals.of((String)"2010-01-01/P1D"), (Object)((DataSegment)publishedSegments.get(0)).getInterval());
        Assert.assertEquals((String)"segment1 dimensions", (Object)ImmutableList.of((Object)"dim1", (Object)"dim2"), (Object)((DataSegment)publishedSegments.get(0)).getDimensions());
        Assert.assertEquals((String)"segment1 metrics", (Object)ImmutableList.of((Object)"met"), (Object)((DataSegment)publishedSegments.get(0)).getMetrics());
        Assert.assertEquals((String)"segment2 datasource", (Object)"foo", (Object)((DataSegment)publishedSegments.get(1)).getDataSource());
        Assert.assertEquals((String)"segment2 interval", (Object)Intervals.of((String)"2010-01-02/P1D"), (Object)((DataSegment)publishedSegments.get(1)).getInterval());
        Assert.assertEquals((String)"segment2 dimensions", (Object)ImmutableList.of((Object)"dim1", (Object)"dim2"), (Object)((DataSegment)publishedSegments.get(1)).getDimensions());
        Assert.assertEquals((String)"segment2 metrics", (Object)ImmutableList.of((Object)"met"), (Object)((DataSegment)publishedSegments.get(1)).getMetrics());
    }

    @Test
    public void testIndexTaskFailure() throws Exception {
        IndexTask indexTask = new IndexTask(null, null, new IndexTask.IndexIngestionSpec(new DataSchema("foo", null, new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, (GranularitySpec)new UniformGranularitySpec(Granularities.DAY, null, (List)ImmutableList.of((Object)Intervals.of((String)"2010-01-01/P1D"))), null, this.mapper), new IndexTask.IndexIOConfig(null, (InputSource)new MockExceptionInputSource(), (InputFormat)new NoopInputFormat(), Boolean.valueOf(false), Boolean.valueOf(false)), new IndexTask.IndexTuningConfig(null, Integer.valueOf(10000), null, Integer.valueOf(10), null, null, null, null, null, null, null, this.indexSpec, null, Integer.valueOf(3), Boolean.valueOf(false), null, null, null, null, null, null, null, null, null)), null);
        TaskStatus status = this.runTask((Task)indexTask);
        Assert.assertEquals((String)"statusCode", (Object)TaskState.FAILED, (Object)status.getStatusCode());
        Assert.assertEquals((Object)this.taskLocation, (Object)status.getLocation());
        Assert.assertEquals((String)"num segments published", (long)0L, (long)this.mdc.getPublished().size());
        Assert.assertEquals((String)"num segments nuked", (long)0L, (long)this.mdc.getNuked().size());
    }

    @Test
    public void testKillUnusedSegmentsTask() throws Exception {
        final File tmpSegmentDir = this.temporaryFolder.newFolder();
        List expectedUnusedSegments = Lists.transform((List)ImmutableList.of((Object)"2011-04-01/2011-04-02", (Object)"2011-04-02/2011-04-03", (Object)"2011-04-04/2011-04-05"), (Function)new Function<String, DataSegment>(){

            public DataSegment apply(String input) {
                Interval interval = Intervals.of((String)input);
                try {
                    return DataSegment.builder().dataSource("test_kill_task").interval(interval).loadSpec((Map)ImmutableMap.of((Object)"type", (Object)"local", (Object)"path", (Object)(tmpSegmentDir.getCanonicalPath() + "/druid/localStorage/wikipedia/" + interval.getStart() + "-" + interval.getEnd() + "/2011-04-6T16:52:46.119-05:00/0/index.zip"))).version("2011-04-6T16:52:46.119-05:00").dimensions((List)ImmutableList.of()).metrics((List)ImmutableList.of()).shardSpec((ShardSpec)NoneShardSpec.instance()).binaryVersion(Integer.valueOf(9)).size(0L).build();
                }
                catch (IOException e) {
                    throw new ISE((Throwable)e, "Error creating segments", new Object[0]);
                }
            }
        });
        this.mdc.setUnusedSegments(expectedUnusedSegments);
        ArrayList<File> segmentFiles = new ArrayList<File>();
        for (DataSegment segment : this.mdc.retrieveUnusedSegmentsForInterval("test_kill_task", Intervals.of((String)"2011-04-01/P4D"))) {
            File file = new File((String)segment.getLoadSpec().get("path"));
            FileUtils.mkdirp((File)file);
            segmentFiles.add(file);
        }
        KillUnusedSegmentsTask killUnusedSegmentsTask = new KillUnusedSegmentsTask(null, "test_kill_task", Intervals.of((String)"2011-04-01/P4D"), null, Boolean.valueOf(false));
        TaskStatus status = this.runTask((Task)killUnusedSegmentsTask);
        Assert.assertEquals((Object)this.taskLocation, (Object)status.getLocation());
        Assert.assertEquals((String)"merged statusCode", (Object)TaskState.SUCCESS, (Object)status.getStatusCode());
        Assert.assertEquals((String)"num segments published", (long)0L, (long)this.mdc.getPublished().size());
        Assert.assertEquals((String)"num segments nuked", (long)3L, (long)this.mdc.getNuked().size());
        Assert.assertTrue((String)"expected unused segments get killed", (expectedUnusedSegments.containsAll(this.mdc.getNuked()) && this.mdc.getNuked().containsAll(expectedUnusedSegments) ? 1 : 0) != 0);
        for (File file : segmentFiles) {
            Assert.assertFalse((String)"unused segments files get deleted", (boolean)file.exists());
        }
    }

    @Test
    public void testRealtimeishTask() throws Exception {
        RealtimeishTask rtishTask = new RealtimeishTask();
        TaskStatus status = this.runTask((Task)rtishTask);
        Assert.assertEquals((String)"statusCode", (Object)TaskState.SUCCESS, (Object)status.getStatusCode());
        Assert.assertEquals((Object)this.taskLocation, (Object)status.getLocation());
        Assert.assertEquals((String)"num segments published", (long)2L, (long)this.mdc.getPublished().size());
        Assert.assertEquals((String)"num segments nuked", (long)0L, (long)this.mdc.getNuked().size());
    }

    @Test
    public void testNoopTask() throws Exception {
        Task noopTask = (Task)new DefaultObjectMapper().readValue("{\"type\":\"noop\", \"runTime\":\"100\"}\"", Task.class);
        TaskStatus status = this.runTask(noopTask);
        Assert.assertEquals((String)"statusCode", (Object)TaskState.SUCCESS, (Object)status.getStatusCode());
        Assert.assertEquals((Object)this.taskLocation, (Object)status.getLocation());
        Assert.assertEquals((String)"num segments published", (long)0L, (long)this.mdc.getPublished().size());
        Assert.assertEquals((String)"num segments nuked", (long)0L, (long)this.mdc.getNuked().size());
    }

    @Test
    public void testNeverReadyTask() throws Exception {
        Task neverReadyTask = (Task)new DefaultObjectMapper().readValue("{\"type\":\"noop\", \"isReadyResult\":\"exception\"}\"", Task.class);
        TaskStatus status = this.runTask(neverReadyTask);
        Assert.assertEquals((String)"statusCode", (Object)TaskState.FAILED, (Object)status.getStatusCode());
        Assert.assertEquals((Object)this.taskLocation, (Object)status.getLocation());
        Assert.assertEquals((String)"num segments published", (long)0L, (long)this.mdc.getPublished().size());
        Assert.assertEquals((String)"num segments nuked", (long)0L, (long)this.mdc.getNuked().size());
    }

    @Test
    public void testSimple() throws Exception {
        AbstractFixedIntervalTask task = new AbstractFixedIntervalTask("id1", "id1", new TaskResource("id1", 1), "ds", Intervals.of((String)"2012-01-01/P1D"), null){

            public String getType() {
                return "test";
            }

            public void stopGracefully(TaskConfig taskConfig) {
            }

            public TaskStatus run(TaskToolbox toolbox) throws Exception {
                Interval interval = Intervals.of((String)"2012-01-01/P1D");
                TimeChunkLockTryAcquireAction action = new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval);
                TaskLock lock = (TaskLock)toolbox.getTaskActionClient().submit((TaskAction)action);
                if (lock == null) {
                    throw new ISE("Failed to get a lock", new Object[0]);
                }
                DataSegment segment = DataSegment.builder().dataSource("ds").interval(interval).version(lock.getVersion()).size(0L).build();
                toolbox.getTaskActionClient().submit((TaskAction)new SegmentInsertAction((Set)ImmutableSet.of((Object)segment)));
                return TaskStatus.success((String)this.getId());
            }
        };
        TaskStatus status = this.runTask((Task)task);
        Assert.assertEquals((Object)this.taskLocation, (Object)status.getLocation());
        Assert.assertEquals((String)"statusCode", (Object)TaskState.SUCCESS, (Object)status.getStatusCode());
        Assert.assertEquals((String)"segments published", (long)1L, (long)this.mdc.getPublished().size());
        Assert.assertEquals((String)"segments nuked", (long)0L, (long)this.mdc.getNuked().size());
    }

    @Test
    public void testBadInterval() throws Exception {
        AbstractFixedIntervalTask task = new AbstractFixedIntervalTask("id1", "id1", "ds", Intervals.of((String)"2012-01-01/P1D"), null){

            public String getType() {
                return "test";
            }

            public void stopGracefully(TaskConfig taskConfig) {
            }

            public TaskStatus run(TaskToolbox toolbox) throws Exception {
                TaskLock myLock = (TaskLock)Iterables.getOnlyElement((Iterable)((Iterable)toolbox.getTaskActionClient().submit((TaskAction)new LockListAction())));
                DataSegment segment = DataSegment.builder().dataSource("ds").interval(Intervals.of((String)"2012-01-01/P2D")).version(myLock.getVersion()).size(0L).build();
                toolbox.getTaskActionClient().submit((TaskAction)new SegmentInsertAction((Set)ImmutableSet.of((Object)segment)));
                return TaskStatus.success((String)this.getId());
            }
        };
        TaskStatus status = this.runTask((Task)task);
        Assert.assertEquals((String)"statusCode", (Object)TaskState.FAILED, (Object)status.getStatusCode());
        Assert.assertEquals((Object)this.taskLocation, (Object)status.getLocation());
        Assert.assertEquals((String)"segments published", (long)0L, (long)this.mdc.getPublished().size());
        Assert.assertEquals((String)"segments nuked", (long)0L, (long)this.mdc.getNuked().size());
    }

    @Test
    public void testBadVersion() throws Exception {
        AbstractFixedIntervalTask task = new AbstractFixedIntervalTask("id1", "id1", "ds", Intervals.of((String)"2012-01-01/P1D"), null){

            public String getType() {
                return "test";
            }

            public void stopGracefully(TaskConfig taskConfig) {
            }

            public TaskStatus run(TaskToolbox toolbox) throws Exception {
                TaskLock myLock = (TaskLock)Iterables.getOnlyElement((Iterable)((Iterable)toolbox.getTaskActionClient().submit((TaskAction)new LockListAction())));
                DataSegment segment = DataSegment.builder().dataSource("ds").interval(Intervals.of((String)"2012-01-01/P1D")).version(myLock.getVersion() + "1!!!1!!").size(0L).build();
                toolbox.getTaskActionClient().submit((TaskAction)new SegmentInsertAction((Set)ImmutableSet.of((Object)segment)));
                return TaskStatus.success((String)this.getId());
            }
        };
        TaskStatus status = this.runTask((Task)task);
        Assert.assertEquals((String)"statusCode", (Object)TaskState.FAILED, (Object)status.getStatusCode());
        Assert.assertEquals((Object)this.taskLocation, (Object)status.getLocation());
        Assert.assertEquals((String)"segments published", (long)0L, (long)this.mdc.getPublished().size());
        Assert.assertEquals((String)"segments nuked", (long)0L, (long)this.mdc.getNuked().size());
    }

    @Test(timeout=60000L)
    public void testRealtimeIndexTask() throws Exception {
        publishCountDown = new CountDownLatch(1);
        this.monitorScheduler.addMonitor((Monitor)EasyMock.anyObject(Monitor.class));
        EasyMock.expectLastCall().atLeastOnce();
        this.monitorScheduler.removeMonitor((Monitor)EasyMock.anyObject(Monitor.class));
        EasyMock.expectLastCall().anyTimes();
        EasyMock.replay((Object[])new Object[]{this.monitorScheduler, this.queryRunnerFactoryConglomerate});
        RealtimeIndexTask realtimeIndexTask = this.newRealtimeIndexTask();
        String taskId = realtimeIndexTask.getId();
        this.taskQueue.start();
        this.taskQueue.add((Task)realtimeIndexTask);
        publishCountDown.await();
        Assert.assertEquals((long)1L, (long)this.handOffCallbacks.size());
        Pair executorRunnablePair = (Pair)Iterables.getOnlyElement(this.handOffCallbacks.values());
        ((Executor)executorRunnablePair.lhs).execute((Runnable)executorRunnablePair.rhs);
        this.handOffCallbacks.clear();
        while (((TaskStatus)this.tsqa.getStatus(taskId).get()).isRunnable()) {
            Thread.sleep(10L);
        }
        TaskStatus status = (TaskStatus)this.tsqa.getStatus(taskId).get();
        Assert.assertTrue((String)"Task should be in Success state", (boolean)status.isSuccess());
        Assert.assertEquals((Object)this.taskLocation, (Object)status.getLocation());
        Assert.assertEquals((long)1L, (long)this.announcedSinks);
        Assert.assertEquals((long)1L, (long)this.pushedSegments);
        Assert.assertEquals((long)1L, (long)this.mdc.getPublished().size());
        DataSegment segment = this.mdc.getPublished().iterator().next();
        Assert.assertEquals((Object)"test_ds", (Object)segment.getDataSource());
        Assert.assertEquals((Object)ImmutableList.of((Object)"dim1", (Object)"dim2"), (Object)segment.getDimensions());
        Assert.assertEquals((Object)Intervals.of((String)(now.toString("YYYY-MM-dd") + "/" + now.plusDays(1).toString("YYYY-MM-dd"))), (Object)segment.getInterval());
        Assert.assertEquals((Object)ImmutableList.of((Object)"count"), (Object)segment.getMetrics());
        EasyMock.verify((Object[])new Object[]{this.monitorScheduler, this.queryRunnerFactoryConglomerate});
    }

    @Test(timeout=60000L)
    public void testRealtimeIndexTaskFailure() throws Exception {
        this.dataSegmentPusher = new DataSegmentPusher(){

            @Deprecated
            public String getPathForHadoop(String s) {
                return this.getPathForHadoop();
            }

            public String getPathForHadoop() {
                throw new UnsupportedOperationException();
            }

            public DataSegment push(File file, DataSegment dataSegment, boolean useUniquePath) {
                throw new RuntimeException("FAILURE");
            }

            public Map<String, Object> makeLoadSpec(URI uri) {
                throw new UnsupportedOperationException();
            }
        };
        this.tb = this.setUpTaskToolboxFactory(this.dataSegmentPusher, this.handoffNotifierFactory, this.mdc);
        this.taskRunner = this.setUpThreadPoolTaskRunner(this.tb);
        this.taskQueue = this.setUpTaskQueue(this.taskStorage, this.taskRunner);
        this.monitorScheduler.addMonitor((Monitor)EasyMock.anyObject(Monitor.class));
        EasyMock.expectLastCall().atLeastOnce();
        this.monitorScheduler.removeMonitor((Monitor)EasyMock.anyObject(Monitor.class));
        EasyMock.expectLastCall().anyTimes();
        EasyMock.replay((Object[])new Object[]{this.monitorScheduler, this.queryRunnerFactoryConglomerate});
        RealtimeIndexTask realtimeIndexTask = this.newRealtimeIndexTask();
        String taskId = realtimeIndexTask.getId();
        this.taskQueue.start();
        this.taskQueue.add((Task)realtimeIndexTask);
        while (((TaskStatus)this.tsqa.getStatus(taskId).get()).isRunnable()) {
            Thread.sleep(10L);
        }
        TaskStatus status = (TaskStatus)this.tsqa.getStatus(taskId).get();
        Assert.assertTrue((String)"Task should be in Failure state", (boolean)status.isFailure());
        Assert.assertEquals((Object)this.taskLocation, (Object)status.getLocation());
        EasyMock.verify((Object[])new Object[]{this.monitorScheduler, this.queryRunnerFactoryConglomerate});
    }

    @Test
    public void testResumeTasks() throws Exception {
        IndexTask indexTask = new IndexTask(null, null, new IndexTask.IndexIngestionSpec(new DataSchema("foo", new TimestampSpec(null, null, null), DimensionsSpec.EMPTY, new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, (GranularitySpec)new UniformGranularitySpec(Granularities.DAY, null, (List)ImmutableList.of((Object)Intervals.of((String)"2010-01-01/P2D"))), null), new IndexTask.IndexIOConfig(null, (InputSource)new MockInputSource(), (InputFormat)new NoopInputFormat(), Boolean.valueOf(false), Boolean.valueOf(false)), new IndexTask.IndexTuningConfig(null, Integer.valueOf(10000), null, Integer.valueOf(10), null, null, null, null, null, null, null, this.indexSpec, null, null, null, null, null, null, null, null, null, null, null, null)), null);
        long startTime = System.currentTimeMillis();
        this.taskQueue.start();
        this.taskStorage.insert((Task)indexTask, TaskStatus.running((String)indexTask.getId()));
        while (((TaskStatus)this.tsqa.getStatus(indexTask.getId()).get()).isRunnable()) {
            if (System.currentTimeMillis() > startTime + 10000L) {
                throw new ISE("Where did the task go?!: %s", new Object[]{indexTask.getId()});
            }
            Thread.sleep(100L);
        }
        TaskStatus status = (TaskStatus)this.taskStorage.getStatus(indexTask.getId()).get();
        List publishedSegments = BY_INTERVAL_ORDERING.sortedCopy(this.mdc.getPublished());
        List loggedSegments = BY_INTERVAL_ORDERING.sortedCopy((Iterable)this.tsqa.getInsertedSegments(indexTask.getId()));
        Assert.assertEquals((String)"statusCode", (Object)TaskState.SUCCESS, (Object)status.getStatusCode());
        Assert.assertEquals((Object)this.taskLocation, (Object)status.getLocation());
        Assert.assertEquals((String)"segments logged vs published", (Object)loggedSegments, (Object)publishedSegments);
        Assert.assertEquals((String)"num segments published", (long)2L, (long)this.mdc.getPublished().size());
        Assert.assertEquals((String)"num segments nuked", (long)0L, (long)this.mdc.getNuked().size());
        Assert.assertEquals((String)"segment1 datasource", (Object)"foo", (Object)((DataSegment)publishedSegments.get(0)).getDataSource());
        Assert.assertEquals((String)"segment1 interval", (Object)Intervals.of((String)"2010-01-01/P1D"), (Object)((DataSegment)publishedSegments.get(0)).getInterval());
        Assert.assertEquals((String)"segment1 dimensions", (Object)ImmutableList.of((Object)"dim1", (Object)"dim2"), (Object)((DataSegment)publishedSegments.get(0)).getDimensions());
        Assert.assertEquals((String)"segment1 metrics", (Object)ImmutableList.of((Object)"met"), (Object)((DataSegment)publishedSegments.get(0)).getMetrics());
        Assert.assertEquals((String)"segment2 datasource", (Object)"foo", (Object)((DataSegment)publishedSegments.get(1)).getDataSource());
        Assert.assertEquals((String)"segment2 interval", (Object)Intervals.of((String)"2010-01-02/P1D"), (Object)((DataSegment)publishedSegments.get(1)).getInterval());
        Assert.assertEquals((String)"segment2 dimensions", (Object)ImmutableList.of((Object)"dim1", (Object)"dim2"), (Object)((DataSegment)publishedSegments.get(1)).getDimensions());
        Assert.assertEquals((String)"segment2 metrics", (Object)ImmutableList.of((Object)"met"), (Object)((DataSegment)publishedSegments.get(1)).getMetrics());
    }

    @Test
    public void testUnifiedAppenderatorsManagerCleanup() throws Exception {
        ExecutorService exec = Executors.newFixedThreadPool(8);
        UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = new UnifiedIndexerAppenderatorsManager((QueryProcessingPool)new ForwardingQueryProcessingPool(exec), JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER, new WorkerConfig(), MapCache.create((long)2048L), new CacheConfig(), new CachePopulatorStats(), MAPPER, (ServiceEmitter)new NoopServiceEmitter(), () -> this.queryRunnerFactoryConglomerate);
        this.tb = this.setUpTaskToolboxFactory(this.dataSegmentPusher, this.handoffNotifierFactory, this.mdc, (AppenderatorsManager)unifiedIndexerAppenderatorsManager);
        this.taskRunner = this.setUpThreadPoolTaskRunner(this.tb);
        this.taskQueue = this.setUpTaskQueue(this.taskStorage, this.taskRunner);
        IndexTask indexTask = new IndexTask(null, null, new IndexTask.IndexIngestionSpec(new DataSchema("foo", new TimestampSpec(null, null, null), DimensionsSpec.EMPTY, new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, (GranularitySpec)new UniformGranularitySpec(Granularities.DAY, null, (List)ImmutableList.of((Object)Intervals.of((String)"2010-01-01/P2D"))), null), new IndexTask.IndexIOConfig(null, (InputSource)new MockInputSource(), (InputFormat)new NoopInputFormat(), Boolean.valueOf(false), Boolean.valueOf(false)), new IndexTask.IndexTuningConfig(null, Integer.valueOf(10000), null, Integer.valueOf(10), null, null, null, null, null, null, null, this.indexSpec, null, Integer.valueOf(3), Boolean.valueOf(false), null, null, null, null, null, null, null, null, null)), null);
        Optional preRunTaskStatus = this.tsqa.getStatus(indexTask.getId());
        Assert.assertTrue((String)"pre run task status not present", (!preRunTaskStatus.isPresent() ? 1 : 0) != 0);
        TaskStatus mergedStatus = this.runTask((Task)indexTask);
        TaskStatus status = (TaskStatus)this.taskStorage.getStatus(indexTask.getId()).get();
        Assert.assertEquals((String)"statusCode", (Object)TaskState.SUCCESS, (Object)status.getStatusCode());
        Map bundleMap = unifiedIndexerAppenderatorsManager.getDatasourceBundles();
        Assert.assertEquals((long)1L, (long)bundleMap.size());
        unifiedIndexerAppenderatorsManager.removeAppenderatorsForTask(indexTask.getId(), "foo");
        Assert.assertTrue((boolean)bundleMap.isEmpty());
    }

    @Test
    public void testLockRevoked() throws Exception {
        AbstractFixedIntervalTask task = new AbstractFixedIntervalTask("id1", "id1", new TaskResource("id1", 1), "ds", Intervals.of((String)"2012-01-01/P1D"), null){

            public String getType() {
                return "test";
            }

            public void stopGracefully(TaskConfig taskConfig) {
            }

            public TaskStatus run(TaskToolbox toolbox) throws Exception {
                Interval interval = Intervals.of((String)"2012-01-01/P1D");
                TimeChunkLockTryAcquireAction action = new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval);
                TaskLock lock = (TaskLock)toolbox.getTaskActionClient().submit((TaskAction)action);
                if (lock == null) {
                    throw new ISE("Failed to get a lock", new Object[0]);
                }
                TaskLock lockBeforeRevoke = (TaskLock)toolbox.getTaskActionClient().submit((TaskAction)action);
                Assert.assertFalse((boolean)lockBeforeRevoke.isRevoked());
                TaskLifecycleTest.this.taskLockbox.revokeLock(this.getId(), lock);
                TaskLock lockAfterRevoke = (TaskLock)toolbox.getTaskActionClient().submit((TaskAction)action);
                Assert.assertTrue((boolean)lockAfterRevoke.isRevoked());
                return TaskStatus.failure((String)this.getId(), (String)"lock revoked test");
            }
        };
        TaskStatus status = this.runTask((Task)task);
        Assert.assertEquals((Object)this.taskLocation, (Object)status.getLocation());
        Assert.assertEquals((String)"statusCode", (Object)TaskState.FAILED, (Object)status.getStatusCode());
        Assert.assertEquals((String)"segments published", (long)0L, (long)this.mdc.getPublished().size());
        Assert.assertEquals((String)"segments nuked", (long)0L, (long)this.mdc.getNuked().size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TaskStatus runTask(Task task) throws Exception {
        Task dummyTask = (Task)new DefaultObjectMapper().readValue("{\"type\":\"noop\", \"isReadyResult\":\"exception\"}\"", Task.class);
        long startTime = System.currentTimeMillis();
        Preconditions.checkArgument((!task.getId().equals(dummyTask.getId()) ? 1 : 0) != 0);
        TaskLifecycleTest taskLifecycleTest = this;
        synchronized (taskLifecycleTest) {
            if (!this.taskQueue.isActive()) {
                this.taskQueue.start();
            }
        }
        this.taskQueue.add(dummyTask);
        this.taskQueue.add(task);
        TaskStatus retVal = null;
        for (String taskId : ImmutableList.of((Object)dummyTask.getId(), (Object)task.getId())) {
            try {
                TaskStatus status;
                while ((status = (TaskStatus)this.tsqa.getStatus(taskId).get()).isRunnable()) {
                    if (System.currentTimeMillis() > startTime + 10000L) {
                        throw new ISE("Where did the task go?!: %s", new Object[]{task.getId()});
                    }
                    Thread.sleep(100L);
                }
                if (!taskId.equals(task.getId())) continue;
                retVal = status;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return retVal;
    }

    private RealtimeIndexTask newRealtimeIndexTask() {
        String taskId = StringUtils.format((String)"rt_task_%s", (Object[])new Object[]{System.currentTimeMillis()});
        DataSchema dataSchema = new DataSchema("test_ds", (Map)TestHelper.makeJsonMapper().convertValue((Object)new MapInputRowParser((ParseSpec)new TimeAndDimsParseSpec(new TimestampSpec(null, null, null), DimensionsSpec.EMPTY)), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT), new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")}, (GranularitySpec)new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), null, this.mapper);
        RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig((FirehoseFactory)new MockFirehoseFactory(), null);
        RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(null, Integer.valueOf(1000), null, null, new Period((Object)"P1Y"), null, null, null, null, null, null, null, null, 0, 0, null, null, null, null, null);
        FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);
        return new RealtimeIndexTask(taskId, new TaskResource(taskId, 1), fireDepartment, null);
    }

    static {
        TEST_UTILS = new TestUtils();
        MAPPER = TEST_UTILS.getTestObjectMapper();
        INDEX_MERGER_V9_FACTORY = TEST_UTILS.getIndexMergerV9Factory();
        INDEX_IO = TEST_UTILS.getTestIndexIO();
        BY_INTERVAL_ORDERING = new Ordering<DataSegment>(){

            public int compare(DataSegment dataSegment, DataSegment dataSegment2) {
                return Comparators.intervalsByStartThenEnd().compare(dataSegment.getInterval(), dataSegment2.getInterval());
            }
        };
        now = DateTimes.nowUtc();
        REALTIME_IDX_TASK_INPUT_ROWS = ImmutableList.of((Object)TaskLifecycleTest.ir(now.toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 1.0f), (Object)TaskLifecycleTest.ir(now.plus((ReadablePeriod)new Period((Object)Hours.ONE)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 2.0f), (Object)TaskLifecycleTest.ir(now.plus((ReadablePeriod)new Period((Object)Hours.TWO)).toString("YYYY-MM-dd'T'HH:mm:ss"), "test_dim1", "test_dim2", 3.0f));
        IDX_TASK_INPUT_ROWS = ImmutableList.of((Object)TaskLifecycleTest.ir("2010-01-01T01", "x", "y", 1.0f), (Object)TaskLifecycleTest.ir("2010-01-01T01", "x", "z", 1.0f), (Object)TaskLifecycleTest.ir("2010-01-02T01", "a", "b", 2.0f), (Object)TaskLifecycleTest.ir("2010-01-02T01", "a", "c", 1.0f));
    }

    private static class MockFirehoseFactory
    implements FirehoseFactory {
        private MockFirehoseFactory() {
        }

        public Firehose connect(InputRowParser parser, File temporaryDirectory) {
            final Iterator inputRowIterator = REALTIME_IDX_TASK_INPUT_ROWS.iterator();
            return new Firehose(){

                public boolean hasMore() {
                    return inputRowIterator.hasNext();
                }

                @Nullable
                public InputRow nextRow() {
                    return (InputRow)inputRowIterator.next();
                }

                public void close() {
                }
            };
        }
    }

    private static class MockInputSource
    extends AbstractInputSource {
        private MockInputSource() {
        }

        protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) {
            return new InputSourceReader(){

                public CloseableIterator<InputRow> read() {
                    Iterator inputRowIterator = IDX_TASK_INPUT_ROWS.iterator();
                    return CloseableIterators.withEmptyBaggage(inputRowIterator);
                }

                public CloseableIterator<InputRowListPlusRawValues> sample() {
                    throw new UnsupportedOperationException();
                }
            };
        }

        public boolean isSplittable() {
            return false;
        }

        public boolean needsFormat() {
            return false;
        }
    }

    private static class MockExceptionInputSource
    extends AbstractInputSource {
        private MockExceptionInputSource() {
        }

        protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) {
            return new InputSourceReader(){

                public CloseableIterator<InputRow> read() {
                    return new CloseableIterator<InputRow>(){

                        public void close() {
                        }

                        public boolean hasNext() {
                            return true;
                        }

                        public InputRow next() {
                            throw new RuntimeException("HA HA HA");
                        }
                    };
                }

                public CloseableIterator<InputRowListPlusRawValues> sample() {
                    throw new UnsupportedOperationException();
                }
            };
        }

        public boolean isSplittable() {
            return false;
        }

        public boolean needsFormat() {
            return false;
        }
    }
}

