/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.seekablestream.supervisor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SeekableStreamSupervisorStateTest
extends EasyMockSupport {
    private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
    private static final String DATASOURCE = "testDS";
    private static final String STREAM = "stream";
    private static final String SHARD_ID = "0";
    private static final StreamPartition<String> SHARD0_PARTITION = StreamPartition.of((String)"stream", (Object)"0");
    private static final String EXCEPTION_MSG = "I had an exception";
    private TaskStorage taskStorage;
    private TaskMaster taskMaster;
    private TaskRunner taskRunner;
    private TaskQueue taskQueue;
    private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
    private SeekableStreamIndexTaskClientFactory taskClientFactory;
    private SeekableStreamSupervisorSpec spec;
    private SeekableStreamIndexTaskClient indexTaskClient;
    private RecordSupplier<String, String, ByteEntity> recordSupplier;
    private RowIngestionMetersFactory rowIngestionMetersFactory;
    private SupervisorStateManagerConfig supervisorConfig;
    private TestEmitter emitter;

    @Before
    public void setupTest() {
        this.taskStorage = (TaskStorage)this.createMock(TaskStorage.class);
        this.taskMaster = (TaskMaster)this.createMock(TaskMaster.class);
        this.taskRunner = (TaskRunner)this.createMock(TaskRunner.class);
        this.taskQueue = (TaskQueue)this.createMock(TaskQueue.class);
        this.indexerMetadataStorageCoordinator = (IndexerMetadataStorageCoordinator)this.createMock(IndexerMetadataStorageCoordinator.class);
        this.taskClientFactory = (SeekableStreamIndexTaskClientFactory)this.createMock(SeekableStreamIndexTaskClientFactory.class);
        this.spec = (SeekableStreamSupervisorSpec)this.createMock(SeekableStreamSupervisorSpec.class);
        this.indexTaskClient = (SeekableStreamIndexTaskClient)this.createMock(SeekableStreamIndexTaskClient.class);
        this.recordSupplier = (RecordSupplier)this.createMock(RecordSupplier.class);
        this.rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
        this.supervisorConfig = new SupervisorStateManagerConfig();
        this.emitter = new TestEmitter();
        EasyMock.expect((Object)this.spec.getSupervisorStateManagerConfig()).andReturn((Object)this.supervisorConfig).anyTimes();
        EasyMock.expect((Object)this.spec.getDataSchema()).andReturn((Object)SeekableStreamSupervisorStateTest.getDataSchema()).anyTimes();
        EasyMock.expect((Object)this.spec.getIoConfig()).andReturn((Object)SeekableStreamSupervisorStateTest.getIOConfig()).anyTimes();
        EasyMock.expect((Object)this.spec.getTuningConfig()).andReturn((Object)SeekableStreamSupervisorStateTest.getTuningConfig()).anyTimes();
        EasyMock.expect((Object)this.spec.getEmitter()).andReturn((Object)this.emitter).anyTimes();
        EasyMock.expect((Object)this.taskClientFactory.build(EasyMock.anyString(), (TaskInfoProvider)EasyMock.anyObject(), EasyMock.anyInt(), (SeekableStreamSupervisorTuningConfig)EasyMock.anyObject())).andReturn((Object)this.indexTaskClient).anyTimes();
        EasyMock.expect((Object)this.taskMaster.getTaskRunner()).andReturn((Object)Optional.of((Object)this.taskRunner)).anyTimes();
        EasyMock.expect((Object)this.taskMaster.getTaskQueue()).andReturn((Object)Optional.of((Object)this.taskQueue)).anyTimes();
        this.taskRunner.registerListener((TaskRunnerListener)EasyMock.anyObject(TaskRunnerListener.class), (Executor)EasyMock.anyObject(Executor.class));
        EasyMock.expectLastCall().times(0, 1);
        EasyMock.expect((Object)this.indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();
        EasyMock.expect((Object)this.recordSupplier.getAssignment()).andReturn((Object)ImmutableSet.of(SHARD0_PARTITION)).anyTimes();
        EasyMock.expect((Object)this.recordSupplier.getLatestSequenceNumber((StreamPartition)EasyMock.anyObject())).andReturn((Object)"10").anyTimes();
    }

    @Test
    public void testRunning() throws Exception {
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)false).anyTimes();
        EasyMock.expect((Object)this.recordSupplier.getPartitionIds(STREAM)).andReturn((Object)ImmutableSet.of((Object)SHARD_ID)).anyTimes();
        EasyMock.expect((Object)this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn((Object)ImmutableList.of()).anyTimes();
        EasyMock.expect((Object)this.taskQueue.add((Task)EasyMock.anyObject())).andReturn((Object)true).anyTimes();
        this.replayAll();
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
        supervisor.start();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        this.verifyAll();
    }

    @Test
    public void testRunningStreamGetSequenceNumberReturnsNull() throws Exception {
        EasyMock.reset((Object[])new Object[]{this.recordSupplier});
        EasyMock.expect((Object)this.recordSupplier.getAssignment()).andReturn((Object)ImmutableSet.of(SHARD0_PARTITION)).anyTimes();
        EasyMock.expect((Object)this.recordSupplier.getLatestSequenceNumber((StreamPartition)EasyMock.anyObject())).andReturn(null).anyTimes();
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)false).anyTimes();
        EasyMock.expect((Object)this.recordSupplier.getPartitionIds(STREAM)).andReturn((Object)ImmutableSet.of((Object)SHARD_ID)).anyTimes();
        EasyMock.expect((Object)this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn((Object)ImmutableList.of()).anyTimes();
        EasyMock.expect((Object)this.taskQueue.add((Task)EasyMock.anyObject())).andReturn((Object)true).anyTimes();
        this.replayAll();
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
        supervisor.start();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        List exceptionEvents = supervisor.stateManager.getExceptionEvents();
        Assert.assertEquals((long)1L, (long)exceptionEvents.size());
        Assert.assertFalse((boolean)((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent)exceptionEvents.get(0)).isStreamException());
        Assert.assertEquals((Object)ISE.class.getName(), (Object)((SupervisorStateManager.ExceptionEvent)exceptionEvents.get(0)).getExceptionClass());
        Assert.assertEquals((Object)StringUtils.format((String)"unable to fetch sequence number for partition[%s] from stream", (Object[])new Object[]{SHARD_ID}), (Object)((SupervisorStateManager.ExceptionEvent)exceptionEvents.get(0)).getMessage());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)2L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertFalse((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)3L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        this.verifyAll();
    }

    @Test
    public void testConnectingToStreamFail() throws Exception {
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)false).anyTimes();
        EasyMock.expect((Object)this.recordSupplier.getPartitionIds(STREAM)).andThrow((Throwable)new StreamException((Throwable)new IllegalStateException(EXCEPTION_MSG))).anyTimes();
        EasyMock.expect((Object)this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn((Object)ImmutableList.of()).anyTimes();
        EasyMock.expect((Object)this.taskQueue.add((Task)EasyMock.anyObject())).andReturn((Object)true).anyTimes();
        this.replayAll();
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
        supervisor.start();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        List exceptionEvents = supervisor.stateManager.getExceptionEvents();
        Assert.assertEquals((long)1L, (long)exceptionEvents.size());
        Assert.assertTrue((boolean)((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent)exceptionEvents.get(0)).isStreamException());
        Assert.assertEquals((Object)IllegalStateException.class.getName(), (Object)((SupervisorStateManager.ExceptionEvent)exceptionEvents.get(0)).getExceptionClass());
        Assert.assertEquals((Object)StringUtils.format((String)"%s: %s", (Object[])new Object[]{IllegalStateException.class.getName(), EXCEPTION_MSG}), (Object)((SupervisorStateManager.ExceptionEvent)exceptionEvents.get(0)).getMessage());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)2L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertFalse((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)3L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        this.verifyAll();
    }

    @Test
    public void testConnectingToStreamFailRecoveryFailRecovery() throws Exception {
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)false).anyTimes();
        EasyMock.expect((Object)this.recordSupplier.getPartitionIds(STREAM)).andThrow((Throwable)new StreamException((Throwable)new IllegalStateException())).times(3);
        EasyMock.expect((Object)this.recordSupplier.getPartitionIds(STREAM)).andReturn((Object)ImmutableSet.of((Object)SHARD_ID)).times(3);
        EasyMock.expect((Object)this.recordSupplier.getPartitionIds(STREAM)).andThrow((Throwable)new StreamException((Throwable)new IllegalStateException())).times(3);
        EasyMock.expect((Object)this.recordSupplier.getPartitionIds(STREAM)).andReturn((Object)ImmutableSet.of((Object)SHARD_ID)).times(3);
        EasyMock.expect((Object)this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn((Object)ImmutableList.of()).anyTimes();
        EasyMock.expect((Object)this.taskQueue.add((Task)EasyMock.anyObject())).andReturn((Object)true).anyTimes();
        this.replayAll();
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
        supervisor.start();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        supervisor.runInternal();
        Assert.assertFalse((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)3L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        supervisor.runInternal();
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        supervisor.runInternal();
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        supervisor.runInternal();
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        supervisor.runInternal();
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.LOST_CONTACT_WITH_STREAM, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertFalse((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.LOST_CONTACT_WITH_STREAM, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        supervisor.runInternal();
        Assert.assertFalse((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.LOST_CONTACT_WITH_STREAM, (Object)supervisor.stateManager.getSupervisorState());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        this.verifyAll();
    }

    @Test
    public void testDiscoveringInitialTasksFailRecoveryFail() throws Exception {
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)false).anyTimes();
        EasyMock.expect((Object)this.recordSupplier.getPartitionIds(STREAM)).andReturn((Object)ImmutableSet.of((Object)SHARD_ID)).anyTimes();
        EasyMock.expect((Object)this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andThrow((Throwable)new IllegalStateException(EXCEPTION_MSG)).times(3);
        EasyMock.expect((Object)this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn((Object)ImmutableList.of()).times(6);
        EasyMock.expect((Object)this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andThrow((Throwable)new IllegalStateException(EXCEPTION_MSG)).times(3);
        EasyMock.expect((Object)this.taskQueue.add((Task)EasyMock.anyObject())).andReturn((Object)true).anyTimes();
        this.replayAll();
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
        supervisor.start();
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        List exceptionEvents = supervisor.stateManager.getExceptionEvents();
        Assert.assertEquals((long)1L, (long)exceptionEvents.size());
        Assert.assertFalse((boolean)((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent)exceptionEvents.get(0)).isStreamException());
        Assert.assertEquals((Object)IllegalStateException.class.getName(), (Object)((SupervisorStateManager.ExceptionEvent)exceptionEvents.get(0)).getExceptionClass());
        Assert.assertEquals((Object)EXCEPTION_MSG, (Object)((SupervisorStateManager.ExceptionEvent)exceptionEvents.get(0)).getMessage());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((long)2L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertFalse((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)3L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertFalse((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((long)3L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertFalse((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((long)3L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)3L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertFalse((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        this.verifyAll();
    }

    @Test
    public void testIdleStateTransition() throws Exception {
        EasyMock.reset((Object[])new Object[]{this.spec});
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)false).anyTimes();
        EasyMock.expect((Object)this.spec.getDataSchema()).andReturn((Object)SeekableStreamSupervisorStateTest.getDataSchema()).anyTimes();
        EasyMock.expect((Object)this.spec.getIoConfig()).andReturn((Object)new SeekableStreamSupervisorIOConfig(STREAM, (InputFormat)new JsonInputFormat(new JSONPathSpec(Boolean.valueOf(true), (List)ImmutableList.of()), (Map)ImmutableMap.of(), Boolean.valueOf(false), Boolean.valueOf(false), Boolean.valueOf(false)), 1, 1, new Period((Object)"PT1H"), new Period((Object)"PT1S"), new Period((Object)"PT30S"), false, new Period((Object)"PT30M"), null, null, null, null, new IdleConfig(Boolean.valueOf(true), Long.valueOf(200L))){}).anyTimes();
        EasyMock.expect((Object)this.spec.getTuningConfig()).andReturn((Object)SeekableStreamSupervisorStateTest.getTuningConfig()).anyTimes();
        EasyMock.expect((Object)this.spec.getEmitter()).andReturn((Object)this.emitter).anyTimes();
        EasyMock.expect((Object)this.spec.getMonitorSchedulerConfig()).andReturn((Object)new DruidMonitorSchedulerConfig(){

            public Duration getEmitterPeriod() {
                return new Period((Object)"PT1S").toStandardDuration();
            }
        }).anyTimes();
        EasyMock.expect((Object)this.spec.getType()).andReturn((Object)"test").anyTimes();
        EasyMock.expect((Object)this.spec.getSupervisorStateManagerConfig()).andReturn((Object)this.supervisorConfig).anyTimes();
        EasyMock.expect((Object)this.recordSupplier.getPartitionIds(STREAM)).andReturn((Object)ImmutableSet.of((Object)SHARD_ID)).anyTimes();
        EasyMock.expect((Object)this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn((Object)ImmutableList.of()).anyTimes();
        EasyMock.expect((Object)this.taskQueue.add((Task)EasyMock.anyObject())).andReturn((Object)true).anyTimes();
        this.replayAll();
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
        supervisor.start();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        Thread.sleep(100L);
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        Thread.sleep(100L);
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.IDLE, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.IDLE, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        Thread.sleep(100L);
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.IDLE, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.IDLE, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        this.verifyAll();
    }

    @Test
    public void testCreatingTasksFailRecoveryFail() throws Exception {
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)false).anyTimes();
        EasyMock.expect((Object)this.recordSupplier.getPartitionIds(STREAM)).andReturn((Object)ImmutableSet.of((Object)SHARD_ID)).anyTimes();
        EasyMock.expect((Object)this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn((Object)ImmutableList.of()).anyTimes();
        EasyMock.expect((Object)this.taskQueue.add((Task)EasyMock.anyObject())).andThrow((Throwable)new IllegalStateException(EXCEPTION_MSG)).times(3);
        EasyMock.expect((Object)this.taskQueue.add((Task)EasyMock.anyObject())).andReturn((Object)true).times(3);
        EasyMock.expect((Object)this.taskQueue.add((Task)EasyMock.anyObject())).andThrow((Throwable)new IllegalStateException(EXCEPTION_MSG)).times(3);
        this.replayAll();
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
        supervisor.start();
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        List exceptionEvents = supervisor.stateManager.getExceptionEvents();
        Assert.assertEquals((long)1L, (long)exceptionEvents.size());
        Assert.assertFalse((boolean)((SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent)exceptionEvents.get(0)).isStreamException());
        Assert.assertEquals((Object)IllegalStateException.class.getName(), (Object)((SupervisorStateManager.ExceptionEvent)exceptionEvents.get(0)).getExceptionClass());
        Assert.assertEquals((Object)EXCEPTION_MSG, (Object)((SupervisorStateManager.ExceptionEvent)exceptionEvents.get(0)).getMessage());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)2L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertFalse((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)3L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertFalse((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((long)3L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertFalse((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((long)3L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertEquals((long)3L, (long)supervisor.stateManager.getExceptionEvents().size());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertFalse((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.UNHEALTHY_SUPERVISOR, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        this.verifyAll();
    }

    @Test
    public void testSuspended() throws Exception {
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)true).anyTimes();
        EasyMock.expect((Object)this.recordSupplier.getPartitionIds(STREAM)).andReturn((Object)ImmutableSet.of((Object)SHARD_ID)).anyTimes();
        EasyMock.expect((Object)this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn((Object)ImmutableList.of()).anyTimes();
        EasyMock.expect((Object)this.taskQueue.add((Task)EasyMock.anyObject())).andReturn((Object)true).anyTimes();
        this.replayAll();
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
        supervisor.start();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.SUSPENDED, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.SUSPENDED, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.SUSPENDED, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.SUSPENDED, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        this.verifyAll();
    }

    @Test
    public void testStopping() throws Exception {
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)false).anyTimes();
        EasyMock.expect((Object)this.recordSupplier.getPartitionIds(STREAM)).andReturn((Object)ImmutableSet.of((Object)SHARD_ID)).anyTimes();
        EasyMock.expect((Object)this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn((Object)ImmutableList.of()).anyTimes();
        EasyMock.expect((Object)this.taskQueue.add((Task)EasyMock.anyObject())).andReturn((Object)true).anyTimes();
        this.taskRunner.unregisterListener("testSupervisorId");
        this.indexTaskClient.close();
        this.recordSupplier.close();
        this.replayAll();
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
        supervisor.start();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.stop(false);
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.STOPPING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.STOPPING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        this.verifyAll();
    }

    @Test
    public void testStoppingGracefully() throws Exception {
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)false).anyTimes();
        EasyMock.expect((Object)this.recordSupplier.getPartitionIds(STREAM)).andReturn((Object)ImmutableSet.of((Object)SHARD_ID)).anyTimes();
        EasyMock.expect((Object)this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn((Object)ImmutableList.of()).anyTimes();
        EasyMock.expect((Object)this.taskQueue.add((Task)EasyMock.anyObject())).andReturn((Object)true).anyTimes();
        this.taskRunner.unregisterListener("testSupervisorId");
        this.indexTaskClient.close();
        this.recordSupplier.close();
        this.replayAll();
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
        supervisor.start();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.RUNNING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertTrue((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        supervisor.stop(true);
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.STOPPING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.STOPPING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        supervisor.runInternal();
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.STOPPING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.STOPPING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        this.verifyAll();
    }

    @Test
    public void testEmitBothLag() throws Exception {
        this.expectEmitterSupervisor(false);
        CountDownLatch latch = new CountDownLatch(1);
        TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(latch, 1, (Map<String, Long>)ImmutableMap.of((Object)"1", (Object)100L, (Object)"2", (Object)250L, (Object)"3", (Object)500L), (Map<String, Long>)ImmutableMap.of((Object)"1", (Object)10000L, (Object)"2", (Object)15000L, (Object)"3", (Object)20000L));
        supervisor.start();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        latch.await();
        List<Event> events = this.emitter.getEvents();
        List<String> whitelist = Arrays.asList("ingest/test/lag", "ingest/test/maxLag", "ingest/test/avgLag", "ingest/test/lag/time", "ingest/test/maxLag/time", "ingest/test/avgLag/time");
        events = this.filterMetrics(events, whitelist);
        Assert.assertEquals((long)6L, (long)events.size());
        Assert.assertEquals((Object)"ingest/test/lag", (Object)events.get(0).toMap().get((Object)"metric"));
        Assert.assertEquals((Object)850L, (Object)events.get(0).toMap().get((Object)"value"));
        Assert.assertEquals((Object)"ingest/test/maxLag", (Object)events.get(1).toMap().get((Object)"metric"));
        Assert.assertEquals((Object)500L, (Object)events.get(1).toMap().get((Object)"value"));
        Assert.assertEquals((Object)"ingest/test/avgLag", (Object)events.get(2).toMap().get((Object)"metric"));
        Assert.assertEquals((Object)283L, (Object)events.get(2).toMap().get((Object)"value"));
        Assert.assertEquals((Object)"ingest/test/lag/time", (Object)events.get(3).toMap().get((Object)"metric"));
        Assert.assertEquals((Object)45000L, (Object)events.get(3).toMap().get((Object)"value"));
        Assert.assertEquals((Object)"ingest/test/maxLag/time", (Object)events.get(4).toMap().get((Object)"metric"));
        Assert.assertEquals((Object)20000L, (Object)events.get(4).toMap().get((Object)"value"));
        Assert.assertEquals((Object)"ingest/test/avgLag/time", (Object)events.get(5).toMap().get((Object)"metric"));
        Assert.assertEquals((Object)15000L, (Object)events.get(5).toMap().get((Object)"value"));
        this.verifyAll();
    }

    @Test
    public void testEmitRecordLag() throws Exception {
        this.expectEmitterSupervisor(false);
        CountDownLatch latch = new CountDownLatch(1);
        TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(latch, 1, (Map<String, Long>)ImmutableMap.of((Object)"1", (Object)100L, (Object)"2", (Object)250L, (Object)"3", (Object)500L), null);
        supervisor.start();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        latch.await();
        List<Event> events = this.emitter.getEvents();
        List<String> whitelist = Arrays.asList("ingest/test/lag", "ingest/test/maxLag", "ingest/test/avgLag");
        events = this.filterMetrics(events, whitelist);
        Assert.assertEquals((long)3L, (long)events.size());
        Assert.assertEquals((Object)"ingest/test/lag", (Object)events.get(0).toMap().get((Object)"metric"));
        Assert.assertEquals((Object)850L, (Object)events.get(0).toMap().get((Object)"value"));
        Assert.assertEquals((Object)"ingest/test/maxLag", (Object)events.get(1).toMap().get((Object)"metric"));
        Assert.assertEquals((Object)500L, (Object)events.get(1).toMap().get((Object)"value"));
        Assert.assertEquals((Object)"ingest/test/avgLag", (Object)events.get(2).toMap().get((Object)"metric"));
        Assert.assertEquals((Object)283L, (Object)events.get(2).toMap().get((Object)"value"));
        this.verifyAll();
    }

    @Test
    public void testEmitTimeLag() throws Exception {
        this.expectEmitterSupervisor(false);
        CountDownLatch latch = new CountDownLatch(1);
        TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(latch, 1, null, (Map<String, Long>)ImmutableMap.of((Object)"1", (Object)10000L, (Object)"2", (Object)15000L, (Object)"3", (Object)20000L));
        supervisor.start();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        latch.await();
        List<Event> events = this.emitter.getEvents();
        List<String> whitelist = Arrays.asList("ingest/test/lag/time", "ingest/test/maxLag/time", "ingest/test/avgLag/time");
        events = this.filterMetrics(events, whitelist);
        Assert.assertEquals((long)3L, (long)events.size());
        Assert.assertEquals((Object)"ingest/test/lag/time", (Object)events.get(0).toMap().get((Object)"metric"));
        Assert.assertEquals((Object)45000L, (Object)events.get(0).toMap().get((Object)"value"));
        Assert.assertEquals((Object)"ingest/test/maxLag/time", (Object)events.get(1).toMap().get((Object)"metric"));
        Assert.assertEquals((Object)20000L, (Object)events.get(1).toMap().get((Object)"value"));
        Assert.assertEquals((Object)"ingest/test/avgLag/time", (Object)events.get(2).toMap().get((Object)"metric"));
        Assert.assertEquals((Object)15000L, (Object)events.get(2).toMap().get((Object)"value"));
        this.verifyAll();
    }

    @Test
    public void testEmitNoticesQueueSize() throws Exception {
        this.expectEmitterSupervisor(false);
        CountDownLatch latch = new CountDownLatch(1);
        TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(latch, 2, null, null);
        supervisor.start();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        latch.await();
        List<Event> events = this.emitter.getEvents();
        List<String> whitelist = Collections.singletonList("ingest/notices/queueSize");
        events = this.filterMetrics(events, whitelist);
        Assert.assertEquals((long)1L, (long)events.size());
        Assert.assertEquals((Object)"ingest/notices/queueSize", (Object)events.get(0).toMap().get((Object)"metric"));
        Assert.assertEquals((Object)0, (Object)events.get(0).toMap().get((Object)"value"));
        Assert.assertEquals((Object)DATASOURCE, (Object)events.get(0).toMap().get((Object)"dataSource"));
        this.verifyAll();
    }

    @Test
    public void testEmitNoticesTime() throws Exception {
        this.expectEmitterSupervisor(false);
        CountDownLatch latch = new CountDownLatch(1);
        TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(latch, 4, null, null);
        supervisor.start();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.PENDING, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        Assert.assertFalse((boolean)supervisor.stateManager.isAtLeastOneSuccessfulRun());
        latch.await();
        List<Event> events = this.emitter.getEvents();
        List<String> whitelist = Collections.singletonList("ingest/notices/time");
        events = this.filterMetrics(events, whitelist);
        Assert.assertEquals((long)1L, (long)events.size());
        Assert.assertEquals((Object)"ingest/notices/time", (Object)events.get(0).toMap().get((Object)"metric"));
        Assert.assertTrue((String)String.valueOf(events.get(0).toMap().get((Object)"value")), ((Long)events.get(0).toMap().get((Object)"value") > 0L ? 1 : 0) != 0);
        Assert.assertEquals((Object)DATASOURCE, (Object)events.get(0).toMap().get((Object)"dataSource"));
        Assert.assertEquals((Object)"run_notice", (Object)events.get(0).toMap().get((Object)"noticeType"));
        this.verifyAll();
    }

    @Test
    public void testEmitNoLagWhenSuspended() throws Exception {
        this.expectEmitterSupervisor(true);
        CountDownLatch latch = new CountDownLatch(1);
        TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(latch, 1, (Map<String, Long>)ImmutableMap.of((Object)"1", (Object)100L, (Object)"2", (Object)250L, (Object)"3", (Object)500L), (Map<String, Long>)ImmutableMap.of((Object)"1", (Object)10000L, (Object)"2", (Object)15000L, (Object)"3", (Object)20000L));
        supervisor.start();
        supervisor.runInternal();
        Assert.assertTrue((boolean)supervisor.stateManager.isHealthy());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.SUSPENDED, (Object)supervisor.stateManager.getSupervisorState());
        Assert.assertEquals((Object)SupervisorStateManager.BasicState.SUSPENDED, (Object)supervisor.stateManager.getSupervisorState().getBasicState());
        Assert.assertTrue((boolean)supervisor.stateManager.getExceptionEvents().isEmpty());
        latch.await();
        List<Event> events = this.emitter.getEvents();
        List<String> whitelist = Arrays.asList("ingest/test/lag", "ingest/test/maxLag", "ingest/test/avgLag", "ingest/test/lag/time", "ingest/test/maxLag/time", "ingest/test/avgLag/time");
        events = this.filterMetrics(events, whitelist);
        Assert.assertEquals((long)0L, (long)events.size());
        this.verifyAll();
    }

    @Test
    public void testGetStats() {
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)false);
        EasyMock.expect((Object)this.indexTaskClient.getMovingAveragesAsync("task1")).andReturn((Object)Futures.immediateFuture((Object)ImmutableMap.of((Object)"prop1", (Object)"val1"))).times(1);
        EasyMock.expect((Object)this.indexTaskClient.getMovingAveragesAsync("task2")).andReturn((Object)Futures.immediateFuture((Object)ImmutableMap.of((Object)"prop2", (Object)"val2"))).times(1);
        this.replayAll();
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
        supervisor.start();
        supervisor.addTaskGroupToActivelyReadingTaskGroup(supervisor.getTaskGroupIdForPartition(SHARD_ID), ImmutableMap.of((Object)SHARD_ID, (Object)SHARD_ID), Optional.absent(), Optional.absent(), (Set)ImmutableSet.of((Object)"task1"), (Set)ImmutableSet.of());
        supervisor.addTaskGroupToPendingCompletionTaskGroup(supervisor.getTaskGroupIdForPartition("1"), ImmutableMap.of((Object)"1", (Object)SHARD_ID), Optional.absent(), Optional.absent(), (Set)ImmutableSet.of((Object)"task2"), (Set)ImmutableSet.of());
        Map stats = supervisor.getStats();
        this.verifyAll();
        Assert.assertEquals((long)1L, (long)stats.size());
        Assert.assertEquals((Object)ImmutableSet.of((Object)SHARD_ID), stats.keySet());
        Assert.assertEquals((Object)ImmutableMap.of((Object)"task1", (Object)ImmutableMap.of((Object)"prop1", (Object)"val1"), (Object)"task2", (Object)ImmutableMap.of((Object)"prop2", (Object)"val2")), stats.get(SHARD_ID));
    }

    private List<Event> filterMetrics(List<Event> events, List<String> whitelist) {
        List<Event> result = events.stream().filter(e -> whitelist.contains(e.toMap().get((Object)"metric").toString())).collect(Collectors.toList());
        return result;
    }

    private void expectEmitterSupervisor(boolean suspended) throws EntryExistsException {
        this.spec = (SeekableStreamSupervisorSpec)this.createMock(SeekableStreamSupervisorSpec.class);
        EasyMock.expect((Object)this.spec.getSupervisorStateManagerConfig()).andReturn((Object)this.supervisorConfig).anyTimes();
        EasyMock.expect((Object)this.spec.getDataSchema()).andReturn((Object)SeekableStreamSupervisorStateTest.getDataSchema()).anyTimes();
        EasyMock.expect((Object)this.spec.getIoConfig()).andReturn((Object)new SeekableStreamSupervisorIOConfig(STREAM, (InputFormat)new JsonInputFormat(new JSONPathSpec(Boolean.valueOf(true), (List)ImmutableList.of()), (Map)ImmutableMap.of(), Boolean.valueOf(false), Boolean.valueOf(false), Boolean.valueOf(false)), 1, 1, new Period((Object)"PT1H"), new Period((Object)"PT1S"), new Period((Object)"PT30S"), false, new Period((Object)"PT30M"), null, null, null, null, null){}).anyTimes();
        EasyMock.expect((Object)this.spec.getTuningConfig()).andReturn((Object)SeekableStreamSupervisorStateTest.getTuningConfig()).anyTimes();
        EasyMock.expect((Object)this.spec.getEmitter()).andReturn((Object)this.emitter).anyTimes();
        EasyMock.expect((Object)this.spec.getMonitorSchedulerConfig()).andReturn((Object)new DruidMonitorSchedulerConfig(){

            public Duration getEmitterPeriod() {
                return new Period((Object)"PT1S").toStandardDuration();
            }
        }).anyTimes();
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)suspended).anyTimes();
        EasyMock.expect((Object)this.spec.getType()).andReturn((Object)"test").anyTimes();
        EasyMock.expect((Object)this.recordSupplier.getPartitionIds(STREAM)).andReturn((Object)ImmutableSet.of((Object)SHARD_ID)).anyTimes();
        EasyMock.expect((Object)this.taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn((Object)ImmutableList.of()).anyTimes();
        EasyMock.expect((Object)this.taskQueue.add((Task)EasyMock.anyObject())).andReturn((Object)true).anyTimes();
        this.replayAll();
    }

    private static DataSchema getDataSchema() {
        ArrayList<StringDimensionSchema> dimensions = new ArrayList<StringDimensionSchema>();
        dimensions.add(StringDimensionSchema.create((String)"dim1"));
        dimensions.add(StringDimensionSchema.create((String)"dim2"));
        return new DataSchema(DATASOURCE, new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(dimensions), new AggregatorFactory[]{new CountAggregatorFactory("rows")}, (GranularitySpec)new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, (List)ImmutableList.of()), null);
    }

    private static SeekableStreamSupervisorIOConfig getIOConfig() {
        return new SeekableStreamSupervisorIOConfig(STREAM, (InputFormat)new JsonInputFormat(new JSONPathSpec(Boolean.valueOf(true), (List)ImmutableList.of()), (Map)ImmutableMap.of(), Boolean.valueOf(false), Boolean.valueOf(false), Boolean.valueOf(false)), 1, 1, new Period((Object)"PT1H"), new Period((Object)"P1D"), new Period((Object)"PT30S"), false, new Period((Object)"PT30M"), null, null, (AutoScalerConfig)OBJECT_MAPPER.convertValue(SeekableStreamSupervisorStateTest.getProperties(), AutoScalerConfig.class), null, null){};
    }

    private static Map<String, Object> getProperties() {
        HashMap<String, Object> autoScalerConfig = new HashMap<String, Object>();
        autoScalerConfig.put("enableTaskAutoScaler", true);
        autoScalerConfig.put("lagCollectionIntervalMillis", 500);
        autoScalerConfig.put("lagCollectionRangeMillis", 500);
        autoScalerConfig.put("scaleOutThreshold", 5000000);
        autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3);
        autoScalerConfig.put("scaleInThreshold", 1000000);
        autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
        autoScalerConfig.put("scaleActionStartDelayMillis", 0);
        autoScalerConfig.put("scaleActionPeriodMillis", 100);
        autoScalerConfig.put("taskCountMax", 8);
        autoScalerConfig.put("taskCountMin", 1);
        autoScalerConfig.put("scaleInStep", 1);
        autoScalerConfig.put("scaleOutStep", 2);
        autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
        return autoScalerConfig;
    }

    private static SeekableStreamSupervisorTuningConfig getTuningConfig() {
        return new SeekableStreamSupervisorTuningConfig(){

            public Integer getWorkerThreads() {
                return 1;
            }

            public boolean getChatAsync() {
                return false;
            }

            public Integer getChatThreads() {
                return 1;
            }

            public Long getChatRetries() {
                return 1L;
            }

            public Duration getHttpTimeout() {
                return new Period((Object)"PT1M").toStandardDuration();
            }

            public Duration getShutdownTimeout() {
                return new Period((Object)"PT1S").toStandardDuration();
            }

            public Duration getRepartitionTransitionDuration() {
                return new Period((Object)"PT2M").toStandardDuration();
            }

            public Duration getOffsetFetchPeriod() {
                return new Period((Object)"PT5M").toStandardDuration();
            }

            public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() {
                return new SeekableStreamIndexTaskTuningConfig(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null){

                    public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir) {
                        return null;
                    }

                    public String toString() {
                        return null;
                    }
                };
            }
        };
    }

    private static class TestEmitter
    extends NoopServiceEmitter {
        @GuardedBy(value="events")
        private final List<Event> events = new ArrayList<Event>();

        private TestEmitter() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void emit(Event event) {
            List<Event> list = this.events;
            synchronized (list) {
                this.events.add(event);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<Event> getEvents() {
            List<Event> list = this.events;
            synchronized (list) {
                return ImmutableList.copyOf(this.events);
            }
        }
    }

    private class TestEmittingTestSeekableStreamSupervisor
    extends BaseTestSeekableStreamSupervisor {
        private final CountDownLatch latch;
        private final Map<String, Long> partitionsRecordLag;
        private final Map<String, Long> partitionsTimeLag;
        private final byte metricFlag;
        private static final byte LAG = 1;
        private static final byte NOTICE_QUEUE = 2;
        private static final byte NOTICE_PROCESS = 4;

        TestEmittingTestSeekableStreamSupervisor(CountDownLatch latch, byte metricFlag, Map<String, Long> partitionsRecordLag, Map<String, Long> partitionsTimeLag) {
            this.latch = latch;
            this.metricFlag = metricFlag;
            this.partitionsRecordLag = partitionsRecordLag;
            this.partitionsTimeLag = partitionsTimeLag;
        }

        @Override
        @Nullable
        protected Map<String, Long> getPartitionRecordLag() {
            return this.partitionsRecordLag;
        }

        @Override
        @Nullable
        protected Map<String, Long> getPartitionTimeLag() {
            return this.partitionsTimeLag;
        }

        protected void emitLag() {
            if ((this.metricFlag & 1) == 0) {
                return;
            }
            super.emitLag();
            if (this.stateManager.isSteadyState()) {
                this.latch.countDown();
            }
        }

        protected void emitNoticesQueueSize() {
            if ((this.metricFlag & 2) == 0) {
                return;
            }
            super.emitNoticesQueueSize();
            this.latch.countDown();
        }

        public void emitNoticeProcessTime(String noticeType, long timeInMillis) {
            if ((this.metricFlag & 4) == 0) {
                return;
            }
            super.emitNoticeProcessTime(noticeType, timeInMillis);
            this.latch.countDown();
        }

        public LagStats computeLagStats() {
            return null;
        }

        protected void scheduleReporting(ScheduledExecutorService reportingExec) {
            SeekableStreamSupervisorIOConfig ioConfig = SeekableStreamSupervisorStateTest.this.spec.getIoConfig();
            reportingExec.scheduleAtFixedRate(this::emitLag, ioConfig.getStartDelay().getMillis(), SeekableStreamSupervisorStateTest.this.spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(), TimeUnit.MILLISECONDS);
            reportingExec.scheduleAtFixedRate(this::emitNoticesQueueSize, ioConfig.getStartDelay().getMillis(), SeekableStreamSupervisorStateTest.this.spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private class TestSeekableStreamSupervisor
    extends BaseTestSeekableStreamSupervisor {
        private TestSeekableStreamSupervisor() {
        }

        protected void scheduleReporting(ScheduledExecutorService reportingExec) {
        }

        public LagStats computeLagStats() {
            return new LagStats(0L, 0L, 0L);
        }
    }

    private abstract class BaseTestSeekableStreamSupervisor
    extends SeekableStreamSupervisor<String, String, ByteEntity> {
        private BaseTestSeekableStreamSupervisor() {
            super("testSupervisorId", SeekableStreamSupervisorStateTest.this.taskStorage, SeekableStreamSupervisorStateTest.this.taskMaster, SeekableStreamSupervisorStateTest.this.indexerMetadataStorageCoordinator, SeekableStreamSupervisorStateTest.this.taskClientFactory, OBJECT_MAPPER, SeekableStreamSupervisorStateTest.this.spec, SeekableStreamSupervisorStateTest.this.rowIngestionMetersFactory, false);
        }

        protected String baseTaskName() {
            return "test";
        }

        protected void updatePartitionLagFromStream() {
        }

        @Nullable
        protected Map<String, Long> getPartitionRecordLag() {
            return null;
        }

        @Nullable
        protected Map<String, Long> getPartitionTimeLag() {
            return null;
        }

        protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(int groupId, Map<String, String> startPartitions, Map<String, String> endPartitions, String baseSequenceName, DateTime minimumMessageTime, DateTime maximumMessageTime, Set<String> exclusiveStartSequenceNumberPartitions, SeekableStreamSupervisorIOConfig ioConfig) {
            return new SeekableStreamIndexTaskIOConfig<String, String>(Integer.valueOf(groupId), baseSequenceName, new SeekableStreamStartSequenceNumbers(SeekableStreamSupervisorStateTest.STREAM, startPartitions, exclusiveStartSequenceNumberPartitions), new SeekableStreamEndSequenceNumbers(SeekableStreamSupervisorStateTest.STREAM, endPartitions), Boolean.valueOf(true), minimumMessageTime, maximumMessageTime, ioConfig.getInputFormat()){};
        }

        protected List<SeekableStreamIndexTask<String, String, ByteEntity>> createIndexTasks(int replicas, String baseSequenceName, ObjectMapper sortingMapper, TreeMap<Integer, Map<String, String>> sequenceOffsets, SeekableStreamIndexTaskIOConfig taskIoConfig, SeekableStreamIndexTaskTuningConfig taskTuningConfig, RowIngestionMetersFactory rowIngestionMetersFactory) {
            return ImmutableList.of((Object)((Object)new TestSeekableStreamIndexTask("id", null, SeekableStreamSupervisorStateTest.getDataSchema(), taskTuningConfig, (SeekableStreamIndexTaskIOConfig<String, String>)taskIoConfig, null, null)));
        }

        protected int getTaskGroupIdForPartition(String partition) {
            return 0;
        }

        protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata) {
            return true;
        }

        protected boolean doesTaskTypeMatchSupervisor(Task task) {
            return true;
        }

        protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(String stream, Map<String, String> map) {
            return null;
        }

        protected OrderedSequenceNumber<String> makeSequenceNumber(String seq, boolean isExclusive) {
            return new OrderedSequenceNumber<String>(seq, isExclusive){

                public int compareTo(OrderedSequenceNumber<String> o) {
                    return new BigInteger((String)this.get()).compareTo(new BigInteger((String)o.get()));
                }
            };
        }

        protected Map<String, Long> getRecordLagPerPartition(Map<String, String> currentOffsets) {
            return null;
        }

        protected Map<String, Long> getTimeLagPerPartition(Map<String, String> currentOffsets) {
            return null;
        }

        protected RecordSupplier<String, String, ByteEntity> setupRecordSupplier() {
            return SeekableStreamSupervisorStateTest.this.recordSupplier;
        }

        protected SeekableStreamSupervisorReportPayload<String, String> createReportPayload(int numPartitions, boolean includeOffsets) {
            return new SeekableStreamSupervisorReportPayload<String, String>(SeekableStreamSupervisorStateTest.DATASOURCE, SeekableStreamSupervisorStateTest.STREAM, 1, 1, 1L, null, null, null, null, null, null, false, true, null, null, null){};
        }

        protected String getNotSetMarker() {
            return "NOT_SET";
        }

        protected String getEndOfPartitionMarker() {
            return "EOF";
        }

        protected boolean isEndOfShard(String seqNum) {
            return false;
        }

        protected boolean isShardExpirationMarker(String seqNum) {
            return false;
        }

        protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() {
            return false;
        }
    }

    private class TestSeekableStreamIndexTask
    extends SeekableStreamIndexTask<String, String, ByteEntity> {
        public TestSeekableStreamIndexTask(@Nullable String id, TaskResource taskResource, DataSchema dataSchema, SeekableStreamIndexTaskTuningConfig tuningConfig, @Nullable SeekableStreamIndexTaskIOConfig<String, String> ioConfig, @Nullable Map<String, Object> context, String groupId) {
            super(id, taskResource, dataSchema, tuningConfig, ioConfig, context, groupId);
        }

        protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> createTaskRunner() {
            return null;
        }

        protected RecordSupplier<String, String, ByteEntity> newTaskRecordSupplier() {
            return SeekableStreamSupervisorStateTest.this.recordSupplier;
        }

        public String getType() {
            return "test";
        }
    }
}

