/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.seekablestream;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SeekableStreamSupervisorSpecTest
extends EasyMockSupport {
    private SeekableStreamSupervisorIngestionSpec ingestionSchema;
    private TaskStorage taskStorage;
    private TaskMaster taskMaster;
    private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
    private ServiceEmitter emitter;
    private RowIngestionMetersFactory rowIngestionMetersFactory;
    private DataSchema dataSchema;
    private SeekableStreamSupervisorTuningConfig seekableStreamSupervisorTuningConfig;
    private SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig;
    private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
    private SeekableStreamIndexTaskClientFactory taskClientFactory;
    private static final String STREAM = "stream";
    private static final String DATASOURCE = "testDS";
    private SeekableStreamSupervisorSpec spec;
    private SupervisorStateManagerConfig supervisorConfig;
    private SeekableStreamSupervisor supervisor4;
    private SeekableStreamIndexTaskClientFactory indexTaskClientFactory;
    private ObjectMapper mapper;
    private DruidMonitorSchedulerConfig monitorSchedulerConfig;
    private SupervisorStateManagerConfig supervisorStateManagerConfig;

    @Before
    public void setUp() {
        this.ingestionSchema = (SeekableStreamSupervisorIngestionSpec)EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
        this.taskStorage = (TaskStorage)EasyMock.mock(TaskStorage.class);
        this.taskMaster = (TaskMaster)EasyMock.mock(TaskMaster.class);
        this.indexerMetadataStorageCoordinator = (IndexerMetadataStorageCoordinator)EasyMock.mock(IndexerMetadataStorageCoordinator.class);
        this.emitter = (ServiceEmitter)EasyMock.mock(ServiceEmitter.class);
        this.rowIngestionMetersFactory = (RowIngestionMetersFactory)EasyMock.mock(RowIngestionMetersFactory.class);
        this.dataSchema = (DataSchema)EasyMock.mock(DataSchema.class);
        this.seekableStreamSupervisorTuningConfig = (SeekableStreamSupervisorTuningConfig)EasyMock.mock(SeekableStreamSupervisorTuningConfig.class);
        this.seekableStreamSupervisorIOConfig = (SeekableStreamSupervisorIOConfig)EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
        this.taskClientFactory = (SeekableStreamIndexTaskClientFactory)EasyMock.mock(SeekableStreamIndexTaskClientFactory.class);
        this.spec = (SeekableStreamSupervisorSpec)EasyMock.mock(SeekableStreamSupervisorSpec.class);
        this.supervisorConfig = new SupervisorStateManagerConfig();
        this.indexTaskClientFactory = (SeekableStreamIndexTaskClientFactory)EasyMock.mock(SeekableStreamIndexTaskClientFactory.class);
        this.mapper = new DefaultObjectMapper();
        this.monitorSchedulerConfig = (DruidMonitorSchedulerConfig)EasyMock.mock(DruidMonitorSchedulerConfig.class);
        this.supervisorStateManagerConfig = (SupervisorStateManagerConfig)EasyMock.mock(SupervisorStateManagerConfig.class);
        this.supervisor4 = (SeekableStreamSupervisor)EasyMock.mock(SeekableStreamSupervisor.class);
    }

    private static SeekableStreamSupervisorTuningConfig getTuningConfig() {
        return new SeekableStreamSupervisorTuningConfig(){

            public Integer getWorkerThreads() {
                return 1;
            }

            public Integer getChatThreads() {
                return 1;
            }

            public Long getChatRetries() {
                return 1L;
            }

            public Duration getHttpTimeout() {
                return new Period((Object)"PT1M").toStandardDuration();
            }

            public Duration getShutdownTimeout() {
                return new Period((Object)"PT1S").toStandardDuration();
            }

            public Duration getRepartitionTransitionDuration() {
                return new Period((Object)"PT2M").toStandardDuration();
            }

            public Duration getOffsetFetchPeriod() {
                return new Period((Object)"PT5M").toStandardDuration();
            }

            public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() {
                return new SeekableStreamIndexTaskTuningConfig(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null){

                    public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir) {
                        return null;
                    }

                    public String toString() {
                        return null;
                    }
                };
            }
        };
    }

    @Test
    public void testAutoScalerConfig() {
        AutoScalerConfig autoScalerConfigEmpty = (AutoScalerConfig)this.mapper.convertValue(new HashMap(), AutoScalerConfig.class);
        Assert.assertTrue((boolean)(autoScalerConfigEmpty instanceof LagBasedAutoScalerConfig));
        Assert.assertFalse((boolean)autoScalerConfigEmpty.getEnableTaskAutoScaler());
        AutoScalerConfig autoScalerConfigNull = (AutoScalerConfig)this.mapper.convertValue(null, AutoScalerConfig.class);
        Assert.assertNull((Object)autoScalerConfigNull);
        AutoScalerConfig autoScalerConfigDefault = (AutoScalerConfig)this.mapper.convertValue((Object)ImmutableMap.of((Object)"autoScalerStrategy", (Object)"lagBased"), AutoScalerConfig.class);
        Assert.assertTrue((boolean)(autoScalerConfigDefault instanceof LagBasedAutoScalerConfig));
        AutoScalerConfig autoScalerConfigValue = (AutoScalerConfig)this.mapper.convertValue((Object)ImmutableMap.of((Object)"lagCollectionIntervalMillis", (Object)"1"), AutoScalerConfig.class);
        Assert.assertTrue((boolean)(autoScalerConfigValue instanceof LagBasedAutoScalerConfig));
        LagBasedAutoScalerConfig lagBasedAutoScalerConfig = (LagBasedAutoScalerConfig)autoScalerConfigValue;
        Assert.assertEquals((long)lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), (long)1L);
        RuntimeException e = null;
        try {
            AutoScalerConfig autoScalerConfig = (AutoScalerConfig)this.mapper.convertValue((Object)ImmutableMap.of((Object)"enableTaskAutoScaler", (Object)"true", (Object)"taskCountMax", (Object)"1", (Object)"taskCountMin", (Object)"4"), AutoScalerConfig.class);
        }
        catch (RuntimeException ex) {
            e = ex;
        }
        Assert.assertNotNull((Object)e);
        e = null;
        try {
            AutoScalerConfig ex = (AutoScalerConfig)this.mapper.convertValue((Object)ImmutableMap.of((Object)"enableTaskAutoScaler", (Object)"true"), AutoScalerConfig.class);
        }
        catch (RuntimeException ex) {
            e = ex;
        }
        Assert.assertNotNull((Object)e);
    }

    @Test
    public void testAutoScalerCreated() {
        HashMap<String, Comparable<Boolean>> autoScalerConfig = new HashMap<String, Comparable<Boolean>>();
        autoScalerConfig.put("enableTaskAutoScaler", Boolean.valueOf(true));
        autoScalerConfig.put("lagCollectionIntervalMillis", Integer.valueOf(500));
        autoScalerConfig.put("lagCollectionRangeMillis", Integer.valueOf(500));
        autoScalerConfig.put("scaleOutThreshold", Integer.valueOf(5000000));
        autoScalerConfig.put("triggerScaleOutFractionThreshold", Double.valueOf(0.3));
        autoScalerConfig.put("scaleInThreshold", Integer.valueOf(1000000));
        autoScalerConfig.put("triggerScaleInFractionThreshold", Double.valueOf(0.8));
        autoScalerConfig.put("scaleActionStartDelayMillis", Integer.valueOf(0));
        autoScalerConfig.put("scaleActionPeriodMillis", Integer.valueOf(100));
        autoScalerConfig.put("taskCountMax", Integer.valueOf(8));
        autoScalerConfig.put("taskCountMin", Integer.valueOf(1));
        autoScalerConfig.put("scaleInStep", Integer.valueOf(1));
        autoScalerConfig.put("scaleOutStep", Integer.valueOf(2));
        autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", Integer.valueOf(1200000));
        EasyMock.expect((Object)this.ingestionSchema.getIOConfig()).andReturn((Object)this.seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect((Object)this.ingestionSchema.getDataSchema()).andReturn((Object)this.dataSchema).anyTimes();
        EasyMock.expect((Object)this.ingestionSchema.getTuningConfig()).andReturn((Object)this.seekableStreamSupervisorTuningConfig).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.ingestionSchema});
        EasyMock.expect((Object)this.seekableStreamSupervisorIOConfig.getAutoScalerConfig()).andReturn(this.mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.seekableStreamSupervisorIOConfig});
        EasyMock.expect((Object)this.supervisor4.getActiveTaskGroupsCount()).andReturn((Object)0).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.supervisor4});
        TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(this.ingestionSchema, null, false, this.taskStorage, this.taskMaster, this.indexerMetadataStorageCoordinator, this.indexTaskClientFactory, this.mapper, this.emitter, this.monitorSchedulerConfig, this.rowIngestionMetersFactory, this.supervisorStateManagerConfig, this.supervisor4, "id1");
        SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler((Supervisor)this.supervisor4);
        Assert.assertTrue((boolean)(autoscaler instanceof LagBasedAutoScaler));
        EasyMock.reset((Object[])new Object[]{this.seekableStreamSupervisorIOConfig});
        autoScalerConfig.put("enableTaskAutoScaler", Boolean.valueOf(false));
        EasyMock.expect((Object)this.seekableStreamSupervisorIOConfig.getAutoScalerConfig()).andReturn(this.mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.seekableStreamSupervisorIOConfig});
        SupervisorTaskAutoScaler autoscaler2 = spec.createAutoscaler((Supervisor)this.supervisor4);
        Assert.assertTrue((boolean)(autoscaler2 instanceof NoopTaskAutoScaler));
        EasyMock.reset((Object[])new Object[]{this.seekableStreamSupervisorIOConfig});
        autoScalerConfig.remove("enableTaskAutoScaler");
        EasyMock.expect((Object)this.seekableStreamSupervisorIOConfig.getAutoScalerConfig()).andReturn(this.mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.seekableStreamSupervisorIOConfig});
        SupervisorTaskAutoScaler autoscaler3 = spec.createAutoscaler((Supervisor)this.supervisor4);
        Assert.assertTrue((boolean)(autoscaler3 instanceof NoopTaskAutoScaler));
        EasyMock.reset((Object[])new Object[]{this.seekableStreamSupervisorIOConfig});
        autoScalerConfig.clear();
        EasyMock.expect((Object)this.seekableStreamSupervisorIOConfig.getAutoScalerConfig()).andReturn(this.mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.seekableStreamSupervisorIOConfig});
        Assert.assertTrue((boolean)autoScalerConfig.isEmpty());
        SupervisorTaskAutoScaler autoscaler4 = spec.createAutoscaler((Supervisor)this.supervisor4);
        Assert.assertTrue((boolean)(autoscaler4 instanceof NoopTaskAutoScaler));
    }

    @Test
    public void testDefaultAutoScalerConfigCreatedWithDefault() {
        EasyMock.expect((Object)this.ingestionSchema.getIOConfig()).andReturn((Object)this.seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect((Object)this.ingestionSchema.getDataSchema()).andReturn((Object)this.dataSchema).anyTimes();
        EasyMock.expect((Object)this.ingestionSchema.getTuningConfig()).andReturn((Object)this.seekableStreamSupervisorTuningConfig).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.ingestionSchema});
        EasyMock.expect((Object)this.seekableStreamSupervisorIOConfig.getAutoScalerConfig()).andReturn(this.mapper.convertValue((Object)ImmutableMap.of((Object)"lagCollectionIntervalMillis", (Object)"1", (Object)"enableTaskAutoScaler", (Object)true, (Object)"taskCountMax", (Object)"4", (Object)"taskCountMin", (Object)"1"), AutoScalerConfig.class)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.seekableStreamSupervisorIOConfig});
        EasyMock.expect((Object)this.supervisor4.getActiveTaskGroupsCount()).andReturn((Object)0).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.supervisor4});
        TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(this.ingestionSchema, null, false, this.taskStorage, this.taskMaster, this.indexerMetadataStorageCoordinator, this.indexTaskClientFactory, this.mapper, this.emitter, this.monitorSchedulerConfig, this.rowIngestionMetersFactory, this.supervisorStateManagerConfig, this.supervisor4, "id1");
        SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler((Supervisor)this.supervisor4);
        Assert.assertTrue((boolean)(autoscaler instanceof LagBasedAutoScaler));
        LagBasedAutoScaler lagBasedAutoScaler = (LagBasedAutoScaler)autoscaler;
        LagBasedAutoScalerConfig lagBasedAutoScalerConfig = lagBasedAutoScaler.getAutoScalerConfig();
        Assert.assertEquals((long)lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), (long)1L);
        Assert.assertEquals((long)lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), (long)600000L);
        Assert.assertEquals((long)lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), (long)300000L);
        Assert.assertEquals((long)lagBasedAutoScalerConfig.getScaleActionPeriodMillis(), (long)60000L);
        Assert.assertEquals((long)lagBasedAutoScalerConfig.getScaleOutThreshold(), (long)6000000L);
        Assert.assertEquals((long)lagBasedAutoScalerConfig.getScaleInThreshold(), (long)1000000L);
        Assert.assertEquals((long)lagBasedAutoScalerConfig.getTaskCountMax(), (long)4L);
        Assert.assertEquals((long)lagBasedAutoScalerConfig.getTaskCountMin(), (long)1L);
        Assert.assertEquals((long)lagBasedAutoScalerConfig.getScaleInStep(), (long)1L);
        Assert.assertEquals((long)lagBasedAutoScalerConfig.getScaleOutStep(), (long)2L);
        Assert.assertEquals((long)lagBasedAutoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), (long)600000L);
    }

    @Test
    public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedException {
        EasyMock.expect((Object)this.spec.getSupervisorStateManagerConfig()).andReturn((Object)this.supervisorConfig).anyTimes();
        EasyMock.expect((Object)this.spec.getDataSchema()).andReturn((Object)SeekableStreamSupervisorSpecTest.getDataSchema()).anyTimes();
        EasyMock.expect((Object)this.spec.getIoConfig()).andReturn((Object)this.getIOConfig(1, true)).anyTimes();
        EasyMock.expect((Object)this.spec.getTuningConfig()).andReturn((Object)SeekableStreamSupervisorSpecTest.getTuningConfig()).anyTimes();
        EasyMock.expect((Object)this.spec.getEmitter()).andReturn((Object)this.emitter).anyTimes();
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)false).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.spec});
        EasyMock.expect((Object)this.ingestionSchema.getIOConfig()).andReturn((Object)this.seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect((Object)this.ingestionSchema.getDataSchema()).andReturn((Object)this.dataSchema).anyTimes();
        EasyMock.expect((Object)this.ingestionSchema.getTuningConfig()).andReturn((Object)this.seekableStreamSupervisorTuningConfig).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.ingestionSchema});
        EasyMock.expect((Object)this.taskMaster.getTaskRunner()).andReturn((Object)Optional.absent()).anyTimes();
        EasyMock.expect((Object)this.taskMaster.getSupervisorManager()).andReturn((Object)Optional.absent()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.taskMaster});
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
        LagBasedAutoScaler autoScaler = new LagBasedAutoScaler((SeekableStreamSupervisor)supervisor, DATASOURCE, (LagBasedAutoScalerConfig)this.mapper.convertValue(SeekableStreamSupervisorSpecTest.getScaleOutProperties(2), LagBasedAutoScalerConfig.class), (SupervisorSpec)this.spec);
        supervisor.start();
        autoScaler.start();
        supervisor.runInternal();
        int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
        Assert.assertEquals((long)1L, (long)taskCountBeforeScaleOut);
        Thread.sleep(1000L);
        int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
        Assert.assertEquals((long)2L, (long)taskCountAfterScaleOut);
        autoScaler.reset();
        autoScaler.stop();
    }

    @Test
    public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() throws InterruptedException {
        EasyMock.expect((Object)this.spec.getSupervisorStateManagerConfig()).andReturn((Object)this.supervisorConfig).anyTimes();
        EasyMock.expect((Object)this.spec.getDataSchema()).andReturn((Object)SeekableStreamSupervisorSpecTest.getDataSchema()).anyTimes();
        EasyMock.expect((Object)this.spec.getIoConfig()).andReturn((Object)this.getIOConfig(1, true)).anyTimes();
        EasyMock.expect((Object)this.spec.getTuningConfig()).andReturn((Object)SeekableStreamSupervisorSpecTest.getTuningConfig()).anyTimes();
        EasyMock.expect((Object)this.spec.getEmitter()).andReturn((Object)this.emitter).anyTimes();
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)false).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.spec});
        EasyMock.expect((Object)this.ingestionSchema.getIOConfig()).andReturn((Object)this.seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect((Object)this.ingestionSchema.getDataSchema()).andReturn((Object)this.dataSchema).anyTimes();
        EasyMock.expect((Object)this.ingestionSchema.getTuningConfig()).andReturn((Object)this.seekableStreamSupervisorTuningConfig).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.ingestionSchema});
        EasyMock.expect((Object)this.taskMaster.getTaskRunner()).andReturn((Object)Optional.absent()).anyTimes();
        EasyMock.expect((Object)this.taskMaster.getSupervisorManager()).andReturn((Object)Optional.absent()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.taskMaster});
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(2);
        LagBasedAutoScaler autoScaler = new LagBasedAutoScaler((SeekableStreamSupervisor)supervisor, DATASOURCE, (LagBasedAutoScalerConfig)this.mapper.convertValue(SeekableStreamSupervisorSpecTest.getScaleOutProperties(3), LagBasedAutoScalerConfig.class), (SupervisorSpec)this.spec);
        supervisor.start();
        autoScaler.start();
        supervisor.runInternal();
        int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
        Assert.assertEquals((long)1L, (long)taskCountBeforeScaleOut);
        Thread.sleep(1000L);
        int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
        Assert.assertEquals((long)2L, (long)taskCountAfterScaleOut);
        autoScaler.reset();
        autoScaler.stop();
    }

    @Test
    public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedException {
        EasyMock.expect((Object)this.spec.getSupervisorStateManagerConfig()).andReturn((Object)this.supervisorConfig).anyTimes();
        EasyMock.expect((Object)this.spec.getDataSchema()).andReturn((Object)SeekableStreamSupervisorSpecTest.getDataSchema()).anyTimes();
        EasyMock.expect((Object)this.spec.getIoConfig()).andReturn((Object)this.getIOConfig(2, false)).anyTimes();
        EasyMock.expect((Object)this.spec.getTuningConfig()).andReturn((Object)SeekableStreamSupervisorSpecTest.getTuningConfig()).anyTimes();
        EasyMock.expect((Object)this.spec.getEmitter()).andReturn((Object)this.emitter).anyTimes();
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)false).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.spec});
        EasyMock.expect((Object)this.ingestionSchema.getIOConfig()).andReturn((Object)this.seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect((Object)this.ingestionSchema.getDataSchema()).andReturn((Object)this.dataSchema).anyTimes();
        EasyMock.expect((Object)this.ingestionSchema.getTuningConfig()).andReturn((Object)this.seekableStreamSupervisorTuningConfig).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.ingestionSchema});
        EasyMock.expect((Object)this.taskMaster.getTaskRunner()).andReturn((Object)Optional.absent()).anyTimes();
        EasyMock.expect((Object)this.taskMaster.getSupervisorManager()).andReturn((Object)Optional.absent()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.taskMaster});
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
        LagBasedAutoScaler autoScaler = new LagBasedAutoScaler((SeekableStreamSupervisor)supervisor, DATASOURCE, (LagBasedAutoScalerConfig)this.mapper.convertValue(SeekableStreamSupervisorSpecTest.getScaleInProperties(), LagBasedAutoScalerConfig.class), (SupervisorSpec)this.spec);
        Assert.assertEquals((long)1L, (long)supervisor.getIoConfig().getTaskCount().intValue());
        supervisor.getIoConfig().setTaskCount(2);
        supervisor.start();
        autoScaler.start();
        supervisor.runInternal();
        int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
        Assert.assertEquals((long)2L, (long)taskCountBeforeScaleOut);
        Thread.sleep(1000L);
        int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
        Assert.assertEquals((long)1L, (long)taskCountAfterScaleOut);
        autoScaler.reset();
        autoScaler.stop();
    }

    @Test
    public void testSeekableStreamSupervisorSpecWithScaleDisable() throws InterruptedException {
        SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new SeekableStreamSupervisorIOConfig(STREAM, (InputFormat)new JsonInputFormat(new JSONPathSpec(Boolean.valueOf(true), (List)ImmutableList.of()), (Map)ImmutableMap.of(), Boolean.valueOf(false)), 1, 1, new Period((Object)"PT1H"), new Period((Object)"P1D"), new Period((Object)"PT30S"), false, new Period((Object)"PT30M"), null, null, null, null){};
        EasyMock.expect((Object)this.spec.getSupervisorStateManagerConfig()).andReturn((Object)this.supervisorConfig).anyTimes();
        EasyMock.expect((Object)this.spec.getDataSchema()).andReturn((Object)SeekableStreamSupervisorSpecTest.getDataSchema()).anyTimes();
        EasyMock.expect((Object)this.spec.getIoConfig()).andReturn((Object)seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect((Object)this.spec.getTuningConfig()).andReturn((Object)SeekableStreamSupervisorSpecTest.getTuningConfig()).anyTimes();
        EasyMock.expect((Object)this.spec.getEmitter()).andReturn((Object)this.emitter).anyTimes();
        EasyMock.expect((Object)this.spec.isSuspended()).andReturn((Object)false).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.spec});
        EasyMock.expect((Object)this.ingestionSchema.getIOConfig()).andReturn((Object)this.seekableStreamSupervisorIOConfig).anyTimes();
        EasyMock.expect((Object)this.ingestionSchema.getDataSchema()).andReturn((Object)this.dataSchema).anyTimes();
        EasyMock.expect((Object)this.ingestionSchema.getTuningConfig()).andReturn((Object)this.seekableStreamSupervisorTuningConfig).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.ingestionSchema});
        EasyMock.expect((Object)this.taskMaster.getTaskRunner()).andReturn((Object)Optional.absent()).anyTimes();
        EasyMock.expect((Object)this.taskMaster.getSupervisorManager()).andReturn((Object)Optional.absent()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.taskMaster});
        TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
        NoopTaskAutoScaler autoScaler = new NoopTaskAutoScaler();
        supervisor.start();
        autoScaler.start();
        supervisor.runInternal();
        int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
        Assert.assertEquals((long)1L, (long)taskCountBeforeScaleOut);
        Thread.sleep(1000L);
        int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
        Assert.assertEquals((long)1L, (long)taskCountAfterScaleOut);
        autoScaler.reset();
        autoScaler.stop();
    }

    private static DataSchema getDataSchema() {
        ArrayList<StringDimensionSchema> dimensions = new ArrayList<StringDimensionSchema>();
        dimensions.add(StringDimensionSchema.create((String)"dim1"));
        dimensions.add(StringDimensionSchema.create((String)"dim2"));
        return new DataSchema(DATASOURCE, new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(dimensions), new AggregatorFactory[]{new CountAggregatorFactory("rows")}, (GranularitySpec)new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, (List)ImmutableList.of()), null);
    }

    private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scaleOut) {
        if (scaleOut) {
            return new SeekableStreamSupervisorIOConfig(STREAM, (InputFormat)new JsonInputFormat(new JSONPathSpec(Boolean.valueOf(true), (List)ImmutableList.of()), (Map)ImmutableMap.of(), Boolean.valueOf(false)), 1, taskCount, new Period((Object)"PT1H"), new Period((Object)"P1D"), new Period((Object)"PT30S"), false, new Period((Object)"PT30M"), null, null, (AutoScalerConfig)this.mapper.convertValue(SeekableStreamSupervisorSpecTest.getScaleOutProperties(2), AutoScalerConfig.class), null){};
        }
        return new SeekableStreamSupervisorIOConfig(STREAM, (InputFormat)new JsonInputFormat(new JSONPathSpec(Boolean.valueOf(true), (List)ImmutableList.of()), (Map)ImmutableMap.of(), Boolean.valueOf(false)), 1, taskCount, new Period((Object)"PT1H"), new Period((Object)"P1D"), new Period((Object)"PT30S"), false, new Period((Object)"PT30M"), null, null, (AutoScalerConfig)this.mapper.convertValue(SeekableStreamSupervisorSpecTest.getScaleInProperties(), AutoScalerConfig.class), null){};
    }

    private static Map<String, Object> getScaleOutProperties(int maxTaskCount) {
        HashMap<String, Object> autoScalerConfig = new HashMap<String, Object>();
        autoScalerConfig.put("enableTaskAutoScaler", true);
        autoScalerConfig.put("lagCollectionIntervalMillis", 500);
        autoScalerConfig.put("lagCollectionRangeMillis", 500);
        autoScalerConfig.put("scaleOutThreshold", 0);
        autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);
        autoScalerConfig.put("scaleInThreshold", 1000000);
        autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
        autoScalerConfig.put("scaleActionStartDelayMillis", 0);
        autoScalerConfig.put("scaleActionPeriodMillis", 100);
        autoScalerConfig.put("taskCountMax", maxTaskCount);
        autoScalerConfig.put("taskCountMin", 1);
        autoScalerConfig.put("scaleInStep", 1);
        autoScalerConfig.put("scaleOutStep", 2);
        autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
        return autoScalerConfig;
    }

    private static Map<String, Object> getScaleInProperties() {
        HashMap<String, Object> autoScalerConfig = new HashMap<String, Object>();
        autoScalerConfig.put("enableTaskAutoScaler", true);
        autoScalerConfig.put("lagCollectionIntervalMillis", 500);
        autoScalerConfig.put("lagCollectionRangeMillis", 500);
        autoScalerConfig.put("scaleOutThreshold", 8000000);
        autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3);
        autoScalerConfig.put("scaleInThreshold", 0);
        autoScalerConfig.put("triggerScaleInFractionThreshold", 0.0);
        autoScalerConfig.put("scaleActionStartDelayMillis", 0);
        autoScalerConfig.put("scaleActionPeriodMillis", 100);
        autoScalerConfig.put("taskCountMax", 2);
        autoScalerConfig.put("taskCountMin", 1);
        autoScalerConfig.put("scaleInStep", 1);
        autoScalerConfig.put("scaleOutStep", 2);
        autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
        return autoScalerConfig;
    }

    private static class TestSeekableStreamSupervisorSpec
    extends SeekableStreamSupervisorSpec {
        private SeekableStreamSupervisor supervisor;
        private String id;

        public TestSeekableStreamSupervisorSpec(SeekableStreamSupervisorIngestionSpec ingestionSchema, @Nullable Map<String, Object> context, Boolean suspended, TaskStorage taskStorage, TaskMaster taskMaster, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, SeekableStreamIndexTaskClientFactory indexTaskClientFactory, ObjectMapper mapper, ServiceEmitter emitter, DruidMonitorSchedulerConfig monitorSchedulerConfig, RowIngestionMetersFactory rowIngestionMetersFactory, SupervisorStateManagerConfig supervisorStateManagerConfig, SeekableStreamSupervisor supervisor, String id) {
            super(ingestionSchema, context, suspended, taskStorage, taskMaster, indexerMetadataStorageCoordinator, indexTaskClientFactory, mapper, emitter, monitorSchedulerConfig, rowIngestionMetersFactory, supervisorStateManagerConfig);
            this.supervisor = supervisor;
            this.id = id;
        }

        public List<String> getDataSources() {
            return new ArrayList<String>();
        }

        public String getId() {
            return this.id;
        }

        public Supervisor createSupervisor() {
            return this.supervisor;
        }

        public String getType() {
            return null;
        }

        public String getSource() {
            return null;
        }

        protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend) {
            return null;
        }
    }

    private class TestSeekableStreamSupervisor
    extends BaseTestSeekableStreamSupervisor {
        private int partitionNumbers;

        public TestSeekableStreamSupervisor(int partitionNumbers) {
            this.partitionNumbers = partitionNumbers;
        }

        protected void scheduleReporting(ScheduledExecutorService reportingExec) {
        }

        public LagStats computeLagStats() {
            return new LagStats(0L, 0L, 0L);
        }

        public int getPartitionCount() {
            return this.partitionNumbers;
        }
    }

    private abstract class BaseTestSeekableStreamSupervisor
    extends SeekableStreamSupervisor<String, String, ByteEntity> {
        private BaseTestSeekableStreamSupervisor() {
            super("testSupervisorId", SeekableStreamSupervisorSpecTest.this.taskStorage, SeekableStreamSupervisorSpecTest.this.taskMaster, SeekableStreamSupervisorSpecTest.this.indexerMetadataStorageCoordinator, SeekableStreamSupervisorSpecTest.this.taskClientFactory, OBJECT_MAPPER, SeekableStreamSupervisorSpecTest.this.spec, SeekableStreamSupervisorSpecTest.this.rowIngestionMetersFactory, false);
        }

        protected String baseTaskName() {
            return "test";
        }

        protected void updatePartitionLagFromStream() {
        }

        @Nullable
        protected Map<String, Long> getPartitionRecordLag() {
            return null;
        }

        @Nullable
        protected Map<String, Long> getPartitionTimeLag() {
            return null;
        }

        protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(int groupId, Map<String, String> startPartitions, Map<String, String> endPartitions, String baseSequenceName, DateTime minimumMessageTime, DateTime maximumMessageTime, Set<String> exclusiveStartSequenceNumberPartitions, SeekableStreamSupervisorIOConfig ioConfig) {
            return new SeekableStreamIndexTaskIOConfig<String, String>(Integer.valueOf(groupId), baseSequenceName, new SeekableStreamStartSequenceNumbers(SeekableStreamSupervisorSpecTest.STREAM, startPartitions, exclusiveStartSequenceNumberPartitions), new SeekableStreamEndSequenceNumbers(SeekableStreamSupervisorSpecTest.STREAM, endPartitions), Boolean.valueOf(true), minimumMessageTime, maximumMessageTime, ioConfig.getInputFormat()){};
        }

        protected List<SeekableStreamIndexTask<String, String, ByteEntity>> createIndexTasks(int replicas, String baseSequenceName, ObjectMapper sortingMapper, TreeMap<Integer, Map<String, String>> sequenceOffsets, SeekableStreamIndexTaskIOConfig taskIoConfig, SeekableStreamIndexTaskTuningConfig taskTuningConfig, RowIngestionMetersFactory rowIngestionMetersFactory) {
            return null;
        }

        protected int getTaskGroupIdForPartition(String partition) {
            return 0;
        }

        protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata) {
            return true;
        }

        protected boolean doesTaskTypeMatchSupervisor(Task task) {
            return true;
        }

        protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(String stream, Map<String, String> map) {
            return null;
        }

        protected OrderedSequenceNumber<String> makeSequenceNumber(String seq, boolean isExclusive) {
            return new OrderedSequenceNumber<String>(seq, isExclusive){

                public int compareTo(OrderedSequenceNumber<String> o) {
                    return new BigInteger((String)this.get()).compareTo(new BigInteger((String)o.get()));
                }
            };
        }

        protected Map<String, Long> getRecordLagPerPartition(Map<String, String> currentOffsets) {
            return null;
        }

        protected Map<String, Long> getTimeLagPerPartition(Map<String, String> currentOffsets) {
            return null;
        }

        protected RecordSupplier<String, String, ByteEntity> setupRecordSupplier() {
            return this.recordSupplier;
        }

        protected SeekableStreamSupervisorReportPayload<String, String> createReportPayload(int numPartitions, boolean includeOffsets) {
            return new SeekableStreamSupervisorReportPayload<String, String>(SeekableStreamSupervisorSpecTest.DATASOURCE, SeekableStreamSupervisorSpecTest.STREAM, 1, 1, 1L, null, null, null, null, null, null, false, true, null, null, null){};
        }

        protected String getNotSetMarker() {
            return "NOT_SET";
        }

        protected String getEndOfPartitionMarker() {
            return "EOF";
        }

        protected boolean isEndOfShard(String seqNum) {
            return false;
        }

        protected boolean isShardExpirationMarker(String seqNum) {
            return false;
        }

        protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() {
            return false;
        }
    }
}

