/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.tests.indexer;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.TaskResponseObject;
import org.apache.druid.testing.utils.DruidClusterAdminClient;
import org.apache.druid.testing.utils.EventSerializer;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.JsonEventSerializer;
import org.apache.druid.testing.utils.StreamAdminClient;
import org.apache.druid.testing.utils.StreamEventWriter;
import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.testng.Assert;

public abstract class AbstractStreamIndexingTest
extends AbstractIndexerTest {
    static final DateTime FIRST_EVENT_TIME = DateTimes.of((int)1994, (int)4, (int)29, (int)1, (int)0);
    static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern((String)"yyyy-MM-dd'T'HH:mm:'00Z'");
    static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern((String)"yyyy-MM-dd'T'HH:mm:ss'.000Z'");
    static final int EVENTS_PER_SECOND = 6;
    static final int TOTAL_NUMBER_OF_SECOND = 10;
    private static final Logger LOG = new Logger(AbstractStreamIndexingTest.class);
    private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
    private static final int STREAM_SHARD_COUNT = 2;
    protected static final long CYCLE_PADDING_MS = 100L;
    private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json";
    private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json";
    private static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE = "supervisor_with_autoscaler_spec_template.json";
    private static final String SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE = "supervisor_with_idle_behaviour_enabled_spec_template.json";
    protected static final String DATA_RESOURCE_ROOT = "/stream/data";
    protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH = String.join((CharSequence)"/", "/stream/data", "supervisor_spec_template.json");
    protected static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_PATH = String.join((CharSequence)"/", "/stream/data", "supervisor_with_autoscaler_spec_template.json");
    protected static final String SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_PATH = String.join((CharSequence)"/", "/stream/data", "supervisor_with_idle_behaviour_enabled_spec_template.json");
    protected static final String SERIALIZER_SPEC_DIR = "serializer";
    protected static final String INPUT_FORMAT_SPEC_DIR = "input_format";
    protected static final String INPUT_ROW_PARSER_SPEC_DIR = "parser";
    protected static final String SERIALIZER = "serializer";
    protected static final String INPUT_FORMAT = "inputFormat";
    protected static final String INPUT_ROW_PARSER = "parser";
    protected static final String JSON_INPUT_FORMAT_PATH = String.join((CharSequence)"/", "/stream/data", "json", "input_format", "input_format.json");
    protected static final List<String> DEFAULT_DIMENSIONS = ImmutableList.of((Object)"page", (Object)"language", (Object)"user", (Object)"unpatrolled", (Object)"newPage", (Object)"robot", (Object)"anonymous", (Object)"namespace", (Object)"continent", (Object)"country", (Object)"region", (Object)"city", (Object[])new String[0]);
    @Inject
    private DruidClusterAdminClient druidClusterAdminClient;
    private StreamAdminClient streamAdminClient;

    abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig var1) throws Exception;

    abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig var1, @Nullable Boolean var2) throws Exception;

    abstract Function<String, String> generateStreamIngestionPropsTransform(String var1, String var2, String var3, String var4, List<String> var5, IntegrationTestingConfig var6);

    abstract Function<String, String> generateStreamQueryPropsTransform(String var1, String var2);

    public abstract String getTestNamePrefix();

    protected void doBeforeClass() throws Exception {
        this.streamAdminClient = this.createStreamAdminClient(this.config);
    }

    private static String getOnlyResourcePath(String resourceRoot) throws IOException {
        return String.join((CharSequence)"/", resourceRoot, (CharSequence)Iterables.getOnlyElement(AbstractStreamIndexingTest.listResources(resourceRoot)));
    }

    protected static List<String> listDataFormatResources() throws IOException {
        return AbstractStreamIndexingTest.listResources(DATA_RESOURCE_ROOT).stream().filter(resource -> !SUPERVISOR_SPEC_TEMPLATE_FILE.equals(resource)).filter(resource -> !SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE.equals(resource)).collect(Collectors.toList());
    }

    protected static Map<String, String> findTestSpecs(String resourceRoot) throws IOException {
        List<String> specDirs = AbstractStreamIndexingTest.listResources(resourceRoot);
        HashMap<String, String> map = new HashMap<String, String>();
        for (String eachSpec : specDirs) {
            if ("serializer".equals(eachSpec)) {
                map.put("serializer", AbstractStreamIndexingTest.getOnlyResourcePath(String.join((CharSequence)"/", resourceRoot, "serializer")));
                continue;
            }
            if ("parser".equals(eachSpec)) {
                map.put("parser", AbstractStreamIndexingTest.getOnlyResourcePath(String.join((CharSequence)"/", resourceRoot, "parser")));
                continue;
            }
            if (!INPUT_FORMAT_SPEC_DIR.equals(eachSpec)) continue;
            map.put(INPUT_FORMAT, AbstractStreamIndexingTest.getOnlyResourcePath(String.join((CharSequence)"/", resourceRoot, INPUT_FORMAT_SPEC_DIR)));
        }
        if (!map.containsKey("serializer")) {
            throw new IAE("Failed to find serializer spec under [%s]. Found resources are %s", new Object[]{resourceRoot, map});
        }
        if (map.size() == 1) {
            throw new IAE("Failed to find input format or parser spec under [%s]. Found resources are %s", new Object[]{resourceRoot, map});
        }
        return map;
    }

    protected Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig) {
        return Closer.create().register(() -> this.doMethodTeardown(generatedTestConfig));
    }

    protected void doTestIndexDataStableState(@Nullable Boolean transactionEnabled, String serializerPath, String parserType, String specPath) throws Exception {
        EventSerializer serializer = (EventSerializer)this.jsonMapper.readValue(AbstractStreamIndexingTest.getResourceAsStream(serializerPath), EventSerializer.class);
        WikipediaStreamEventStreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 6, 100L);
        GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(parserType, AbstractStreamIndexingTest.getResourceAsString(specPath));
        try (Closeable closer = this.createResourceCloser(generatedTestConfig);
             StreamEventWriter streamEventWriter = this.createStreamEventWriter(this.config, transactionEnabled);){
            String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(AbstractStreamIndexingTest.getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
            LOG.info("supervisorSpec: [%s]\n", new Object[]{taskSpec});
            generatedTestConfig.setSupervisorId(this.indexer.submitSupervisor(taskSpec));
            LOG.info("Submitted supervisor", new Object[0]);
            long numWritten = streamGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, 10, FIRST_EVENT_TIME);
            this.verifyIngestedData(generatedTestConfig, numWritten);
        }
    }

    void doTestIndexDataWithLosingCoordinator(@Nullable Boolean transactionEnabled) throws Exception {
        this.testIndexWithLosingNodeHelper(() -> this.druidClusterAdminClient.restartCoordinatorContainer(), () -> this.druidClusterAdminClient.waitUntilCoordinatorReady(), transactionEnabled);
    }

    void doTestIndexDataWithLosingOverlord(@Nullable Boolean transactionEnabled) throws Exception {
        this.testIndexWithLosingNodeHelper(() -> this.druidClusterAdminClient.restartOverlordContainer(), () -> this.druidClusterAdminClient.waitUntilIndexerReady(), transactionEnabled);
    }

    void doTestIndexDataWithLosingHistorical(@Nullable Boolean transactionEnabled) throws Exception {
        this.testIndexWithLosingNodeHelper(() -> this.druidClusterAdminClient.restartHistoricalContainer(), () -> this.druidClusterAdminClient.waitUntilHistoricalReady(), transactionEnabled);
    }

    protected void doTestIndexDataWithStartStopSupervisor(@Nullable Boolean transactionEnabled) throws Exception {
        GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(INPUT_FORMAT, AbstractStreamIndexingTest.getResourceAsString(JSON_INPUT_FORMAT_PATH));
        try (Closeable closer = this.createResourceCloser(generatedTestConfig);
             StreamEventWriter streamEventWriter = this.createStreamEventWriter(this.config, transactionEnabled);){
            String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(AbstractStreamIndexingTest.getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
            LOG.info("supervisorSpec: [%s]\n", new Object[]{taskSpec});
            generatedTestConfig.setSupervisorId(this.indexer.submitSupervisor(taskSpec));
            LOG.info("Submitted supervisor", new Object[0]);
            int secondsToGenerateRemaining = 10;
            int secondsToGenerateFirstRound = 5;
            WikipediaStreamEventStreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator((EventSerializer)new JsonEventSerializer(this.jsonMapper), 6, 100L);
            long numWritten = streamGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
            ITRetryUtil.retryUntil(() -> SupervisorStateManager.BasicState.RUNNING.equals((Object)this.indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), (boolean)true, (long)10000L, (int)30, (String)"Waiting for supervisor to be healthy");
            this.indexer.suspendSupervisor(generatedTestConfig.getSupervisorId());
            this.indexer.resumeSupervisor(generatedTestConfig.getSupervisorId());
            ITRetryUtil.retryUntil(() -> SupervisorStateManager.BasicState.RUNNING.equals((Object)this.indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), (boolean)true, (long)10000L, (int)30, (String)"Waiting for supervisor to be healthy");
            this.verifyIngestedData(generatedTestConfig, numWritten += streamGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining -= secondsToGenerateFirstRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)));
        }
    }

    protected void doTestIndexDataWithAutoscaler(@Nullable Boolean transactionEnabled) throws Exception {
        GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(INPUT_FORMAT, AbstractStreamIndexingTest.getResourceAsString(JSON_INPUT_FORMAT_PATH));
        try (Closeable closer = this.createResourceCloser(generatedTestConfig);
             StreamEventWriter streamEventWriter = this.createStreamEventWriter(this.config, transactionEnabled);){
            String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(AbstractStreamIndexingTest.getResourceAsString(SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_PATH));
            LOG.info("supervisorSpec: [%s]\n", new Object[]{taskSpec});
            generatedTestConfig.setSupervisorId(this.indexer.submitSupervisor(taskSpec));
            LOG.info("Submitted supervisor", new Object[0]);
            String dataSource = generatedTestConfig.getFullDatasourceName();
            int secondsToGenerateRemaining = 10;
            int secondsToGenerateFirstRound = 5;
            WikipediaStreamEventStreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator((EventSerializer)new JsonEventSerializer(this.jsonMapper), 6, 100L);
            long numWritten = streamGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
            ITRetryUtil.retryUntil(() -> SupervisorStateManager.BasicState.RUNNING.equals((Object)this.indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), (boolean)true, (long)10000L, (int)30, (String)"Waiting for supervisor to be healthy");
            ITRetryUtil.retryUntil(() -> this.indexer.getRunningTasks().stream().filter(taskResponseObject -> taskResponseObject.getId().contains(dataSource)).count() == 2L, (boolean)true, (long)10000L, (int)50, (String)"waiting for autoScaling task numbers from 1 to 2");
            this.verifyIngestedData(generatedTestConfig, numWritten += streamGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining -= secondsToGenerateFirstRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)));
        }
    }

    protected void doTestIndexDataWithIdleConfigEnabled(@Nullable Boolean transactionEnabled) throws Exception {
        GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(INPUT_FORMAT, AbstractStreamIndexingTest.getResourceAsString(JSON_INPUT_FORMAT_PATH));
        try (Closeable closer = this.createResourceCloser(generatedTestConfig);
             StreamEventWriter streamEventWriter = this.createStreamEventWriter(this.config, transactionEnabled);){
            String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(AbstractStreamIndexingTest.getResourceAsString(SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_PATH));
            LOG.info("supervisorSpec: [%s]\n", new Object[]{taskSpec});
            generatedTestConfig.setSupervisorId(this.indexer.submitSupervisor(taskSpec));
            LOG.info("Submitted supervisor", new Object[0]);
            String dataSource = generatedTestConfig.getFullDatasourceName();
            int secondsToGenerateRemaining = 10;
            int secondsToGenerateFirstRound = 5;
            secondsToGenerateRemaining -= secondsToGenerateFirstRound;
            WikipediaStreamEventStreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator((EventSerializer)new JsonEventSerializer(this.jsonMapper), 6, 100L);
            long numWritten = streamGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
            ITRetryUtil.retryUntil(() -> SupervisorStateManager.BasicState.RUNNING.equals((Object)this.indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), (boolean)true, (long)10000L, (int)30, (String)"Waiting for supervisor to be healthy");
            ITRetryUtil.retryUntil(() -> SupervisorStateManager.BasicState.IDLE.equals((Object)this.indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), (boolean)true, (long)10000L, (int)30, (String)"Waiting for supervisor to be idle");
            ITRetryUtil.retryUntil(() -> this.indexer.getRunningTasks().stream().noneMatch(taskResponseObject -> taskResponseObject.getId().contains(dataSource)), (boolean)true, (long)10000L, (int)50, (String)"wait for no more creation of indexing tasks");
            this.verifyIngestedData(generatedTestConfig, numWritten += streamGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)));
        }
    }

    protected void doTestTerminatedSupervisorAutoCleanup(@Nullable Boolean transactionEnabled) throws Exception {
        GeneratedTestConfig generatedTestConfig1 = new GeneratedTestConfig(INPUT_FORMAT, AbstractStreamIndexingTest.getResourceAsString(JSON_INPUT_FORMAT_PATH));
        GeneratedTestConfig generatedTestConfig2 = new GeneratedTestConfig(INPUT_FORMAT, AbstractStreamIndexingTest.getResourceAsString(JSON_INPUT_FORMAT_PATH));
        try (Closeable closer1 = this.createResourceCloser(generatedTestConfig1);
             Closeable closer2 = this.createResourceCloser(generatedTestConfig2);){
            String taskSpec1 = generatedTestConfig1.getStreamIngestionPropsTransform().apply(AbstractStreamIndexingTest.getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
            LOG.info("supervisorSpec1: [%s]\n", new Object[]{taskSpec1});
            String taskSpec2 = generatedTestConfig2.getStreamIngestionPropsTransform().apply(AbstractStreamIndexingTest.getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
            LOG.info("supervisorSpec2: [%s]\n", new Object[]{taskSpec2});
            generatedTestConfig1.setSupervisorId(this.indexer.submitSupervisor(taskSpec1));
            generatedTestConfig2.setSupervisorId(this.indexer.submitSupervisor(taskSpec2));
            LOG.info("Submitted supervisors", new Object[0]);
            Thread.sleep(10000L);
            List specs1 = this.indexer.getSupervisorHistory(generatedTestConfig1.getSupervisorId());
            Assert.assertNotNull((Object)specs1);
            Assert.assertFalse((boolean)specs1.isEmpty());
            List specs2 = this.indexer.getSupervisorHistory(generatedTestConfig2.getSupervisorId());
            Assert.assertNotNull((Object)specs2);
            Assert.assertFalse((boolean)specs2.isEmpty());
            LOG.info("Terminating supervisor 2", new Object[0]);
            this.indexer.terminateSupervisor(generatedTestConfig2.getSupervisorId());
            ITRetryUtil.retryUntil(() -> {
                try {
                    this.indexer.getSupervisorHistory(generatedTestConfig2.getSupervisorId());
                    LOG.warn("Supervisor history should not exist", new Object[0]);
                    return false;
                }
                catch (ISE e) {
                    if (e.getMessage().contains("Not Found")) {
                        return true;
                    }
                    throw e;
                }
            }, (boolean)true, (long)10000L, (int)30, (String)"Waiting for supervisor spec 2 to be auto clean");
            specs1 = this.indexer.getSupervisorHistory(generatedTestConfig1.getSupervisorId());
            Assert.assertNotNull((Object)specs1);
            Assert.assertFalse((boolean)specs1.isEmpty());
        }
    }

    protected void doTestIndexDataWithStreamReshardSplit(@Nullable Boolean transactionEnabled) throws Exception {
        this.testIndexWithStreamReshardHelper(transactionEnabled, 4);
    }

    protected void doTestIndexDataWithStreamReshardMerge() throws Exception {
        this.testIndexWithStreamReshardHelper(null, 1);
    }

    private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable, @Nullable Boolean transactionEnabled) throws Exception {
        GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(INPUT_FORMAT, AbstractStreamIndexingTest.getResourceAsString(JSON_INPUT_FORMAT_PATH));
        try (Closeable closer = this.createResourceCloser(generatedTestConfig);
             StreamEventWriter streamEventWriter = this.createStreamEventWriter(this.config, transactionEnabled);){
            String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(AbstractStreamIndexingTest.getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
            LOG.info("supervisorSpec: [%s]\n", new Object[]{taskSpec});
            generatedTestConfig.setSupervisorId(this.indexer.submitSupervisor(taskSpec));
            LOG.info("Submitted supervisor", new Object[0]);
            int secondsToGenerateRemaining = 10;
            int secondsToGenerateFirstRound = 3;
            secondsToGenerateRemaining -= secondsToGenerateFirstRound;
            WikipediaStreamEventStreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator((EventSerializer)new JsonEventSerializer(this.jsonMapper), 6, 100L);
            long numWritten = streamGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
            ITRetryUtil.retryUntil(() -> SupervisorStateManager.BasicState.RUNNING.equals((Object)this.indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), (boolean)true, (long)10000L, (int)30, (String)"Waiting for supervisor to be healthy");
            LOG.info("Restarting Druid process", new Object[0]);
            restartRunnable.run();
            LOG.info("Restarted Druid process", new Object[0]);
            int secondsToGenerateSecondRound = 3;
            numWritten += streamGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
            LOG.info("Waiting for Druid process to be available", new Object[0]);
            waitForReadyRunnable.run();
            LOG.info("Druid process is now available", new Object[0]);
            ITRetryUtil.retryUntil(() -> SupervisorStateManager.BasicState.RUNNING.equals((Object)this.indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), (boolean)true, (long)10000L, (int)30, (String)"Waiting for supervisor to be healthy");
            this.verifyIngestedData(generatedTestConfig, numWritten += streamGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining -= secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound)));
        }
    }

    private void testIndexWithStreamReshardHelper(@Nullable Boolean transactionEnabled, int newShardCount) throws Exception {
        GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(INPUT_FORMAT, AbstractStreamIndexingTest.getResourceAsString(JSON_INPUT_FORMAT_PATH));
        Closeable closer = this.createResourceCloser(generatedTestConfig);
        Object object = null;
        try (StreamEventWriter streamEventWriter2 = this.createStreamEventWriter(this.config, transactionEnabled);){
            String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(AbstractStreamIndexingTest.getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
            LOG.info("supervisorSpec: [%s]\n", new Object[]{taskSpec});
            generatedTestConfig.setSupervisorId(this.indexer.submitSupervisor(taskSpec));
            LOG.info("Submitted supervisor", new Object[0]);
            int secondsToGenerateRemaining = 10;
            int secondsToGenerateFirstRound = 3;
            secondsToGenerateRemaining -= secondsToGenerateFirstRound;
            WikipediaStreamEventStreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator((EventSerializer)new JsonEventSerializer(this.jsonMapper), 6, 100L);
            long numWritten = streamGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter2, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
            ITRetryUtil.retryUntil(() -> SupervisorStateManager.BasicState.RUNNING.equals((Object)this.indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), (boolean)true, (long)10000L, (int)30, (String)"Waiting for supervisor to be healthy");
            this.streamAdminClient.updatePartitionCount(generatedTestConfig.getStreamName(), newShardCount, true);
            int secondsToGenerateSecondRound = 3;
            numWritten += streamGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter2, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
            ITRetryUtil.retryUntil(() -> this.streamAdminClient.isStreamActive(generatedTestConfig.getStreamName()), (boolean)true, (long)10000L, (int)30, (String)"Waiting for stream to finish resharding");
            ITRetryUtil.retryUntil(() -> this.streamAdminClient.verfiyPartitionCountUpdated(generatedTestConfig.getStreamName(), 2, newShardCount), (boolean)true, (long)10000L, (int)30, (String)"Waiting for stream to finish resharding");
            ITRetryUtil.retryUntil(() -> SupervisorStateManager.BasicState.RUNNING.equals((Object)this.indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), (boolean)true, (long)10000L, (int)30, (String)"Waiting for supervisor to be healthy");
            this.verifyIngestedData(generatedTestConfig, numWritten += streamGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter2, secondsToGenerateRemaining -= secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound)));
        }
        catch (Throwable streamEventWriter2) {
            object = streamEventWriter2;
            throw streamEventWriter2;
        }
        finally {
            if (closer != null) {
                if (object != null) {
                    try {
                        closer.close();
                    }
                    catch (Throwable streamEventWriter2) {
                        ((Throwable)object).addSuppressed(streamEventWriter2);
                    }
                } else {
                    closer.close();
                }
            }
        }
        List completedTasks = this.indexer.getCompleteTasksForDataSource(generatedTestConfig.getFullDatasourceName());
        for (TaskResponseObject task : completedTasks) {
            try {
                RowIngestionMetersTotals stats = this.indexer.getTaskStats(task.getId());
                Assert.assertEquals((long)0L, (long)stats.getThrownAway());
            }
            catch (Exception e) {
                if (task.getStatus().isFailure()) continue;
                throw e;
            }
        }
    }

    protected void verifyIngestedData(GeneratedTestConfig generatedTestConfig, long numWritten) throws Exception {
        LOG.info("Waiting for stream indexing tasks to consume events", new Object[0]);
        ITRetryUtil.retryUntilTrue(() -> numWritten == (long)this.queryHelper.countRows(generatedTestConfig.getFullDatasourceName(), Intervals.ETERNITY, name -> new LongSumAggregatorFactory(name, "count")), (String)StringUtils.format((String)"dataSource[%s] consumed [%,d] events, expected [%,d]", (Object[])new Object[]{generatedTestConfig.getFullDatasourceName(), this.queryHelper.countRows(generatedTestConfig.getFullDatasourceName(), Intervals.ETERNITY, name -> new LongSumAggregatorFactory(name, "count")), numWritten}));
        String querySpec = generatedTestConfig.getStreamQueryPropsTransform().apply(AbstractStreamIndexingTest.getResourceAsString(QUERIES_FILE));
        this.queryHelper.testQueriesFromString(querySpec);
        LOG.info("Waiting for all indexing tasks to finish", new Object[0]);
        ITRetryUtil.retryUntilTrue(() -> this.indexer.getCompleteTasksForDataSource(generatedTestConfig.getFullDatasourceName()).size() > 0, (String)"Waiting for Task Completion");
        ITRetryUtil.retryUntilTrue(() -> this.coordinator.areSegmentsLoaded(generatedTestConfig.getFullDatasourceName()), (String)"Real-time generated segments loaded");
        this.queryHelper.testQueriesFromString(querySpec);
    }

    long getSumOfEventSequence(int numEvents) {
        return numEvents * (1 + numEvents) / 2;
    }

    private void doMethodTeardown(GeneratedTestConfig generatedTestConfig) {
        if (generatedTestConfig.getSupervisorId() != null) {
            try {
                LOG.info("Terminating supervisor", new Object[0]);
                this.indexer.terminateSupervisor(generatedTestConfig.getSupervisorId());
                List runningTasks = this.indexer.getUncompletedTasksForDataSource(generatedTestConfig.getFullDatasourceName());
                for (TaskResponseObject task : runningTasks) {
                    this.indexer.shutdownTask(task.getId());
                }
            }
            catch (Exception e) {
                LOG.warn((Throwable)e, "Failed to cleanup supervisor. This might be expected depending on the test method", new Object[0]);
            }
        }
        try {
            this.unloader(generatedTestConfig.getFullDatasourceName());
        }
        catch (Exception e) {
            LOG.warn((Throwable)e, "Failed to cleanup datasource. This might be expected depending on the test method", new Object[0]);
        }
        try {
            this.streamAdminClient.deleteStream(generatedTestConfig.getStreamName());
        }
        catch (Exception e) {
            LOG.warn((Throwable)e, "Failed to cleanup stream. This might be expected depending on the test method", new Object[0]);
        }
    }

    protected class GeneratedTestConfig {
        private final String streamName;
        private final String fullDatasourceName;
        private String supervisorId;
        private Function<String, String> streamIngestionPropsTransform;
        private Function<String, String> streamQueryPropsTransform;

        public GeneratedTestConfig(String parserType, String parserOrInputFormat) throws Exception {
            this(parserType, parserOrInputFormat, DEFAULT_DIMENSIONS);
        }

        public GeneratedTestConfig(String parserType, String parserOrInputFormat, List<String> dimensions) throws Exception {
            this.streamName = AbstractStreamIndexingTest.this.getTestNamePrefix() + "_index_test_" + UUID.randomUUID();
            String datasource = AbstractStreamIndexingTest.this.getTestNamePrefix() + "_indexing_service_test_" + UUID.randomUUID();
            ImmutableMap tags = ImmutableMap.of((Object)AbstractStreamIndexingTest.STREAM_EXPIRE_TAG, (Object)Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
            AbstractStreamIndexingTest.this.streamAdminClient.createStream(this.streamName, 2, (Map)tags);
            ITRetryUtil.retryUntil(() -> AbstractStreamIndexingTest.this.streamAdminClient.isStreamActive(this.streamName), (boolean)true, (long)10000L, (int)30, (String)"Wait for stream active");
            this.fullDatasourceName = datasource + AbstractStreamIndexingTest.this.config.getExtraDatasourceNameSuffix();
            this.streamIngestionPropsTransform = AbstractStreamIndexingTest.this.generateStreamIngestionPropsTransform(this.streamName, this.fullDatasourceName, parserType, parserOrInputFormat, dimensions, AbstractStreamIndexingTest.this.config);
            this.streamQueryPropsTransform = AbstractStreamIndexingTest.this.generateStreamQueryPropsTransform(this.streamName, this.fullDatasourceName);
        }

        public String getSupervisorId() {
            return this.supervisorId;
        }

        public void setSupervisorId(String supervisorId) {
            this.supervisorId = supervisorId;
        }

        public String getStreamName() {
            return this.streamName;
        }

        public String getFullDatasourceName() {
            return this.fullDatasourceName;
        }

        public Function<String, String> getStreamIngestionPropsTransform() {
            return this.streamIngestionPropsTransform;
        }

        public Function<String, String> getStreamQueryPropsTransform() {
            return this.streamQueryPropsTransform;
        }
    }
}

