/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.tests.coordinator.duty;

import com.google.inject.Inject;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CompactionResourceTestClient;
import org.apache.druid.testing.clients.TaskResponseObject;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.CompactionUtil;
import org.apache.druid.testing.utils.EventSerializer;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.KafkaUtil;
import org.apache.druid.testing.utils.StreamEventWriter;
import org.apache.druid.testing.utils.StreamGenerator;
import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;

@Test(groups={"compaction"})
@Guice(moduleFactory=DruidTestModuleFactory.class)
public class ITAutoCompactionLockContentionTest
extends AbstractKafkaIndexingServiceTest {
    private static final Logger LOG = new Logger(ITAutoCompactionLockContentionTest.class);
    @Inject
    private CompactionResourceTestClient compactionResource;
    private AbstractStreamIndexingTest.GeneratedTestConfig generatedTestConfig;
    private StreamGenerator streamGenerator;
    private String fullDatasourceName;

    @DataProvider
    public static Object[] getParameters() {
        return new Object[]{false, true};
    }

    @BeforeClass
    public void setupClass() throws Exception {
        this.doBeforeClass();
    }

    @BeforeMethod
    public void setup() throws Exception {
        this.generatedTestConfig = new AbstractStreamIndexingTest.GeneratedTestConfig(this, "inputFormat", ITAutoCompactionLockContentionTest.getResourceAsString("/stream/data/csv/input_format/input_format.json"));
        this.fullDatasourceName = this.generatedTestConfig.getFullDatasourceName();
        EventSerializer serializer = (EventSerializer)this.jsonMapper.readValue(ITAutoCompactionLockContentionTest.getResourceAsStream("/stream/data/csv/serializer/serializer.json"), EventSerializer.class);
        this.streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 6, 100L);
    }

    @Override
    public String getTestNamePrefix() {
        return "autocompact_lock_contention";
    }

    @Test(dataProvider="getParameters")
    public void testAutoCompactionSkipsLockedIntervals(boolean transactionEnabled) throws Exception {
        if (this.shouldSkipTest(transactionEnabled)) {
            return;
        }
        try (Closeable closer = this.createResourceCloser(this.generatedTestConfig);
             StreamEventWriter streamEventWriter = this.createStreamEventWriter(this.config, transactionEnabled);){
            String taskSpec = this.generatedTestConfig.getStreamIngestionPropsTransform().apply(ITAutoCompactionLockContentionTest.getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
            this.generatedTestConfig.setSupervisorId(this.indexer.submitSupervisor(taskSpec));
            LOG.info("supervisorSpec: [%s]", new Object[]{taskSpec});
            Interval minute1 = Intervals.of((String)"2000-01-01T01:01:00Z/2000-01-01T01:02:00Z");
            long rowsForMinute1 = this.generateData(minute1, streamEventWriter);
            Interval minute2 = Intervals.of((String)"2000-01-01T01:02:00Z/2000-01-01T01:03:00Z");
            long rowsForMinute2 = this.generateData(minute2, streamEventWriter);
            Interval minute3 = Intervals.of((String)"2000-01-01T01:03:00Z/2000-01-01T01:04:00Z");
            long rowsForMinute3 = this.generateData(minute3, streamEventWriter);
            this.ensureRowCount(rowsForMinute1 + rowsForMinute2 + rowsForMinute3);
            this.ensureLockedIntervals(new Interval[0]);
            this.ensureSegmentsLoaded();
            this.ensureSegmentsCount(6);
            this.ensureLockedIntervals(minute2);
            this.submitAndVerifyCompactionConfig();
            this.compactionResource.forceTriggerAutoCompaction();
            this.ensureRowCount(rowsForMinute1 + (rowsForMinute2 += this.generateData(minute2, streamEventWriter)) + rowsForMinute3);
            this.ensureLockedIntervals(new Interval[0]);
            this.ensureSegmentsLoaded();
            this.ensureCompactionTaskCount(2);
            this.verifyCompactedIntervals(minute1, minute3);
            this.compactionResource.forceTriggerAutoCompaction();
            this.ensureCompactionTaskCount(3);
            this.ensureSegmentsLoaded();
            this.verifyCompactedIntervals(minute1, minute2, minute3);
            this.ensureSegmentsCount(3);
        }
    }

    private void ensureSegmentsCount(int numExpectedSegments) {
        ITRetryUtil.retryUntilTrue(() -> {
            List segments = this.coordinator.getFullSegmentsMetadata(this.fullDatasourceName);
            StringBuilder sb = new StringBuilder();
            segments.forEach(seg -> sb.append("{").append(seg.getId()).append(", ").append(seg.getSize()).append("}, "));
            LOG.info("Found Segments: %s", new Object[]{sb});
            LOG.info("Current metadata segment count: %d, expected: %d", new Object[]{segments.size(), numExpectedSegments});
            return segments.size() == numExpectedSegments;
        }, (String)"Segment count check");
    }

    private void verifyCompactedIntervals(Interval ... compactedIntervals) {
        List segments = this.coordinator.getFullSegmentsMetadata(this.fullDatasourceName);
        ArrayList<DataSegment> observedCompactedSegments = new ArrayList<DataSegment>();
        HashSet<Interval> observedCompactedIntervals = new HashSet<Interval>();
        for (DataSegment segment : segments) {
            if (segment.getLastCompactionState() == null) continue;
            observedCompactedSegments.add(segment);
            observedCompactedIntervals.add(segment.getInterval());
        }
        HashSet<Interval> expectedCompactedIntervals = new HashSet<Interval>(Arrays.asList(compactedIntervals));
        Assert.assertEquals(observedCompactedIntervals, expectedCompactedIntervals);
        DynamicPartitionsSpec expectedPartitionSpec = new DynamicPartitionsSpec(Integer.valueOf(10000), Long.valueOf(Long.MAX_VALUE));
        for (DataSegment compactedSegment : observedCompactedSegments) {
            Assert.assertNotNull((Object)compactedSegment.getLastCompactionState());
            Assert.assertEquals((Object)compactedSegment.getLastCompactionState().getPartitionsSpec(), (Object)expectedPartitionSpec);
        }
    }

    private long generateData(Interval interval, StreamEventWriter streamEventWriter) {
        long rowCount = this.streamGenerator.run(this.generatedTestConfig.getStreamName(), streamEventWriter, 10, interval.getStart());
        LOG.info("Generated %d Rows for Interval [%s]", new Object[]{rowCount, interval});
        return rowCount;
    }

    private void ensureSegmentsLoaded() {
        ITRetryUtil.retryUntilTrue(() -> this.coordinator.areSegmentsLoaded(this.fullDatasourceName), (String)"Segment Loading");
    }

    private void ensureLockedIntervals(Interval ... intervals) {
        Map<String, Integer> minTaskPriority = Collections.singletonMap(this.fullDatasourceName, 0);
        ArrayList lockedIntervals = new ArrayList();
        ITRetryUtil.retryUntilTrue(() -> {
            lockedIntervals.clear();
            Map allIntervals = this.indexer.getLockedIntervals(minTaskPriority);
            if (allIntervals.containsKey(this.fullDatasourceName)) {
                lockedIntervals.addAll((Collection)allIntervals.get(this.fullDatasourceName));
            }
            LOG.info("Locked intervals: %s", new Object[]{lockedIntervals});
            return intervals.length == lockedIntervals.size();
        }, (String)"Verify Locked Intervals");
        Assert.assertEquals(lockedIntervals, Arrays.asList(intervals));
    }

    private boolean shouldSkipTest(boolean testEnableTransaction) {
        Map kafkaTestProps = KafkaUtil.getAdditionalKafkaTestConfigFromProperties((IntegrationTestingConfig)this.config);
        boolean configEnableTransaction = Boolean.parseBoolean(kafkaTestProps.getOrDefault("transactionEnabled", "false"));
        return configEnableTransaction != testEnableTransaction;
    }

    private void submitAndVerifyCompactionConfig() throws Exception {
        DataSourceCompactionConfig compactionConfig = CompactionUtil.createCompactionConfig((String)this.fullDatasourceName, (Integer)10000, (Period)Period.ZERO);
        this.compactionResource.updateCompactionTaskSlot(Double.valueOf(0.5), Integer.valueOf(10), null);
        this.compactionResource.submitCompactionConfig(compactionConfig);
        Thread.sleep(2000L);
        CoordinatorCompactionConfig coordinatorCompactionConfig = this.compactionResource.getCoordinatorCompactionConfigs();
        DataSourceCompactionConfig observedCompactionConfig = null;
        for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
            if (!dataSourceCompactionConfig.getDataSource().equals(this.fullDatasourceName)) continue;
            observedCompactionConfig = dataSourceCompactionConfig;
        }
        Assert.assertEquals(observedCompactionConfig, (Object)compactionConfig);
        observedCompactionConfig = this.compactionResource.getDataSourceCompactionConfig(this.fullDatasourceName);
        Assert.assertEquals((Object)observedCompactionConfig, (Object)compactionConfig);
    }

    private boolean isCompactionTask(TaskResponseObject taskResponse) {
        return "compact".equalsIgnoreCase(taskResponse.getType());
    }

    private void ensureCompactionTaskCount(int expectedCount) {
        LOG.info("Verifying compaction task count. Expected: %d", new Object[]{expectedCount});
        ITRetryUtil.retryUntilTrue(() -> this.getCompactionTaskCount() == (long)expectedCount, (String)"Compaction Task Count");
    }

    private long getCompactionTaskCount() {
        List incompleteTasks = this.indexer.getUncompletedTasksForDataSource(this.fullDatasourceName);
        List completeTasks = this.indexer.getCompleteTasksForDataSource(this.fullDatasourceName);
        this.printTasks(incompleteTasks, "Incomplete");
        this.printTasks(completeTasks, "Complete");
        return completeTasks.stream().filter(this::isCompactionTask).count();
    }

    private void printTasks(List<TaskResponseObject> tasks, String taskState) {
        StringBuilder sb = new StringBuilder();
        tasks.forEach(task -> sb.append("{").append(task.getType()).append(", ").append(task.getStatus()).append(", ").append(task.getCreatedTime()).append("}, "));
        LOG.info("%s Tasks: %s", new Object[]{taskState, sb});
    }

    private void ensureRowCount(long totalRows) {
        LOG.info("Verifying Row Count. Expected: %s", new Object[]{totalRows});
        ITRetryUtil.retryUntilTrue(() -> totalRows == (long)this.queryHelper.countRows(this.fullDatasourceName, Intervals.ETERNITY, name -> new LongSumAggregatorFactory(name, "count")), (String)StringUtils.format((String)"dataSource[%s] consumed [%,d] events, expected [%,d]", (Object[])new Object[]{this.fullDatasourceName, this.queryHelper.countRows(this.fullDatasourceName, Intervals.ETERNITY, name -> new LongSumAggregatorFactory(name, "count")), totalRows}));
    }

    private static class Specs {
        static final String SERIALIZER_PATH = "/stream/data/csv/serializer/serializer.json";
        static final String INPUT_FORMAT_PATH = "/stream/data/csv/input_format/input_format.json";
        static final String PARSER_TYPE = "inputFormat";
        static final int MAX_ROWS_PER_SEGMENT = 10000;

        private Specs() {
        }
    }
}

