/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.compact;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactionTestBase;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestInlineCompaction
extends CompactionTestBase {
    private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits, int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) {
        return this.getConfigBuilder(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(Boolean.valueOf(true)).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).withMaxDeltaSecondsBeforeCompaction(maxDeltaTime).withInlineCompactionTriggerStrategy(inlineCompactionType).build()).build();
    }

    private HoodieWriteConfig getConfigDisableCompaction(int maxDeltaCommits, int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) {
        return this.getConfigBuilder(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(Boolean.valueOf(false)).withScheduleInlineCompaction(Boolean.valueOf(false)).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).withMaxDeltaSecondsBeforeCompaction(maxDeltaTime).withInlineCompactionTriggerStrategy(inlineCompactionType).build()).build();
    }

    private void waitForMs(long timeMs) {
        try {
            Thread.sleep(timeMs);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while killing time", e);
        }
    }

    @Test
    public void testCompactionIsNotScheduledEarly() throws Exception {
        HoodieWriteConfig cfg = this.getConfigForInlineCompaction(3, 60, CompactionTriggerStrategy.NUM_COMMITS);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(cfg);){
            List records = this.dataGen.generateInserts(writeClient.createNewInstantTime(), Integer.valueOf(100));
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            List instants = IntStream.range(0, 2).mapToObj(i -> writeClient.createNewInstantTime()).collect(Collectors.toList());
            this.runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList());
            HoodieTableMetaClient metaClient = this.createMetaClient(cfg.getBasePath());
            Assertions.assertEquals((int)2, (int)metaClient.getActiveTimeline().getWriteTimeline().countInstants());
        }
    }

    @Test
    public void testSuccessfulCompactionBasedOnNumCommits() throws Exception {
        HoodieWriteConfig cfg = this.getConfigForInlineCompaction(3, 60, CompactionTriggerStrategy.NUM_COMMITS);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(cfg);){
            List instants = IntStream.range(0, 2).mapToObj(i -> writeClient.createNewInstantTime()).collect(Collectors.toList());
            List records = this.dataGen.generateInserts((String)instants.get(0), Integer.valueOf(100));
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            this.runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList());
            HoodieTableMetaClient metaClient = this.createMetaClient(cfg.getBasePath());
            String finalInstant = writeClient.createNewInstantTime();
            this.createNextDeltaCommit(finalInstant, this.dataGen.generateUpdates(finalInstant, Integer.valueOf(100)), writeClient, metaClient, cfg, false);
            metaClient = this.createMetaClient(cfg.getBasePath());
            Assertions.assertEquals((int)4, (int)metaClient.getActiveTimeline().getWriteTimeline().countInstants());
            Assertions.assertEquals((Object)"commit", (Object)((HoodieInstant)metaClient.getActiveTimeline().lastInstant().get()).getAction());
            String compactionTime = ((HoodieInstant)metaClient.getActiveTimeline().lastInstant().get()).requestedTime();
            Assertions.assertFalse((boolean)WriteMarkersFactory.get((MarkerType)cfg.getMarkersType(), (HoodieTable)HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context), (String)compactionTime).doesMarkerDirExist());
        }
    }

    @Test
    public void testSuccessfulCompactionBasedOnNumAfterCompactionRequest() throws Exception {
        HoodieWriteConfig cfg = this.getConfigDisableCompaction(4, 60, CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(cfg);){
            List instants = IntStream.range(0, 4).mapToObj(i -> writeClient.createNewInstantTime()).collect(Collectors.toList());
            List records = this.dataGen.generateInserts((String)instants.get(0), Integer.valueOf(100));
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            this.runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList());
            String requestInstant = writeClient.createNewInstantTime();
            this.scheduleCompaction(requestInstant, writeClient, cfg);
            this.metaClient = this.createMetaClient(cfg.getBasePath());
            Assertions.assertEquals((long)this.metaClient.getActiveTimeline().getInstantsAsStream().filter(hoodieInstant -> hoodieInstant.getAction().equals("compaction") && hoodieInstant.getState() == HoodieInstant.State.REQUESTED).count(), (long)1L);
            requestInstant = writeClient.createNewInstantTime();
            try {
                this.scheduleCompaction(requestInstant, writeClient, cfg);
                Iterator iterator = Assertions.fail();
            }
            catch (AssertionError assertionError) {
                // empty catch block
            }
            instants = IntStream.range(0, 4).mapToObj(i -> writeClient.createNewInstantTime()).collect(Collectors.toList());
            records = this.dataGen.generateInsertsForPartition((String)instants.get(0), Integer.valueOf(100), "2022/03/15");
            for (String instant2 : instants) {
                this.createNextDeltaCommit(instant2, records, writeClient, this.metaClient, cfg, false);
            }
            requestInstant = writeClient.createNewInstantTime();
            this.scheduleCompaction(requestInstant, writeClient, cfg);
            cfg = this.getConfigForInlineCompaction(4, 60, CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST);
            try (SparkRDDWriteClient newWriteClient = this.getHoodieWriteClient(cfg);){
                String finalInstant = newWriteClient.createNewInstantTime();
                this.createNextDeltaCommit(finalInstant, this.dataGen.generateUpdates(finalInstant, Integer.valueOf(100)), newWriteClient, this.metaClient, cfg, false);
            }
            this.metaClient = this.createMetaClient(cfg.getBasePath());
            Assertions.assertEquals((int)this.metaClient.getActiveTimeline().getCommitsTimeline().filter(instant -> instant.getAction().equals("commit")).countInstants(), (int)2);
            Assertions.assertEquals((int)this.metaClient.getActiveTimeline().getCommitsTimeline().filterPendingCompactionTimeline().countInstants(), (int)0);
            Assertions.assertEquals((Object)"deltacommit", (Object)((HoodieInstant)this.metaClient.getActiveTimeline().lastInstant().get()).getAction());
        }
    }

    @Test
    public void testSuccessfulCompactionBasedOnTime() throws Exception {
        HoodieWriteConfig cfg = this.getConfigForInlineCompaction(5, 10, CompactionTriggerStrategy.TIME_ELAPSED);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(cfg);){
            String instantTime = writeClient.createNewInstantTime();
            List records = this.dataGen.generateInserts(instantTime, Integer.valueOf(10));
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            this.runNextDeltaCommits(writeClient, readClient, Arrays.asList(instantTime), records, cfg, true, new ArrayList());
            this.waitForMs(10000L);
            String finalInstant = writeClient.createNewInstantTime();
            HoodieTableMetaClient metaClient = this.createMetaClient(cfg.getBasePath());
            this.createNextDeltaCommit(finalInstant, this.dataGen.generateUpdates(finalInstant, Integer.valueOf(100)), writeClient, metaClient, cfg, false);
            metaClient = this.createMetaClient(cfg.getBasePath());
            Assertions.assertEquals((int)3, (int)metaClient.getActiveTimeline().getWriteTimeline().countInstants());
            Assertions.assertEquals((Object)"commit", (Object)((HoodieInstant)metaClient.getActiveTimeline().lastInstant().get()).getAction());
        }
    }

    @Test
    public void testSuccessfulCompactionBasedOnNumOrTime() throws Exception {
        HoodieWriteConfig cfg = this.getConfigForInlineCompaction(3, 60, CompactionTriggerStrategy.NUM_OR_TIME);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(cfg);){
            List records = this.dataGen.generateInserts(writeClient.createNewInstantTime(), Integer.valueOf(10));
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            List instants = IntStream.range(0, 2).mapToObj(i -> writeClient.createNewInstantTime()).collect(Collectors.toList());
            this.runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList());
            String finalInstant = writeClient.createNewInstantTime();
            HoodieTableMetaClient metaClient = this.createMetaClient(cfg.getBasePath());
            this.createNextDeltaCommit(finalInstant, this.dataGen.generateUpdates(finalInstant, Integer.valueOf(10)), writeClient, metaClient, cfg, false);
            metaClient = this.createMetaClient(cfg.getBasePath());
            Assertions.assertEquals((int)4, (int)metaClient.getActiveTimeline().getWriteTimeline().countInstants());
            metaClient = this.createMetaClient(cfg.getBasePath());
            this.waitForMs(60000L);
            finalInstant = writeClient.createNewInstantTime();
            this.createNextDeltaCommit(finalInstant, this.dataGen.generateUpdates(finalInstant, Integer.valueOf(10)), writeClient, metaClient, cfg, false);
            metaClient = this.createMetaClient(cfg.getBasePath());
            Assertions.assertEquals((int)6, (int)metaClient.getActiveTimeline().getWriteTimeline().countInstants());
        }
    }

    @Test
    public void testSuccessfulCompactionBasedOnNumAndTime() throws Exception {
        HoodieWriteConfig cfg = this.getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_AND_TIME);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(cfg);){
            List records = this.dataGen.generateInserts(writeClient.createNewInstantTime(), Integer.valueOf(10));
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            List instants = IntStream.range(0, 2).mapToObj(i -> writeClient.createNewInstantTime()).collect(Collectors.toList());
            this.runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList());
            HoodieTableMetaClient metaClient = this.createMetaClient(cfg.getBasePath());
            Assertions.assertEquals((int)2, (int)metaClient.getActiveTimeline().getWriteTimeline().countInstants());
            metaClient = this.createMetaClient(cfg.getBasePath());
            this.waitForMs(20000L);
            String finalInstant = writeClient.createNewInstantTime();
            this.createNextDeltaCommit(finalInstant, this.dataGen.generateUpdates(finalInstant, Integer.valueOf(10)), writeClient, metaClient, cfg, false);
            metaClient = this.createMetaClient(cfg.getBasePath());
            Assertions.assertEquals((int)4, (int)metaClient.getActiveTimeline().getWriteTimeline().countInstants());
        }
    }

    @Test
    public void testCompactionRetryOnFailureBasedOnNumCommits() throws Exception {
        String instantTime2;
        HoodieWriteConfig cfg = this.getConfigBuilder(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(Boolean.valueOf(false)).withMaxNumDeltaCommitsBeforeCompaction(1).build()).build();
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(cfg);){
            List instants = IntStream.range(0, 2).mapToObj(i -> writeClient.createNewInstantTime()).collect(Collectors.toList());
            List records = this.dataGen.generateInserts((String)instants.get(0), Integer.valueOf(100));
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            this.runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList());
            instantTime2 = writeClient.createNewInstantTime();
            this.scheduleCompaction(instantTime2, writeClient, cfg);
            this.moveCompactionFromRequestedToInflight(instantTime2, cfg);
        }
        HoodieWriteConfig inlineCfg = this.getConfigForInlineCompaction(2, 60, CompactionTriggerStrategy.NUM_COMMITS);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(inlineCfg);){
            String instantTime3 = writeClient.createNewInstantTime();
            HoodieTableMetaClient metaClient = this.createMetaClient(cfg.getBasePath());
            this.createNextDeltaCommit(instantTime3, this.dataGen.generateUpdates(instantTime3, Integer.valueOf(100)), writeClient, metaClient, inlineCfg, false);
        }
        this.metaClient = this.createMetaClient(cfg.getBasePath());
        Assertions.assertEquals((int)4, (int)this.metaClient.getActiveTimeline().getWriteTimeline().countInstants());
        Assertions.assertEquals((Object)instantTime2, (Object)((HoodieInstant)this.metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get()).requestedTime());
    }

    @Test
    public void testCompactionRetryOnFailureBasedOnTime() throws Exception {
        String instantTime;
        HoodieWriteConfig cfg = this.getConfigBuilder(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(Boolean.valueOf(false)).withMaxDeltaSecondsBeforeCompaction(5).withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.TIME_ELAPSED).build()).build();
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(cfg);){
            List instants = IntStream.range(0, 2).mapToObj(i -> writeClient.createNewInstantTime()).collect(Collectors.toList());
            List records = this.dataGen.generateInserts((String)instants.get(0), Integer.valueOf(100));
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            this.runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList());
            this.waitForMs(10000L);
            instantTime = writeClient.createNewInstantTime();
            this.scheduleCompaction(instantTime, writeClient, cfg);
            this.moveCompactionFromRequestedToInflight(instantTime, cfg);
        }
        HoodieWriteConfig inlineCfg = this.getConfigForInlineCompaction(5, 1000, CompactionTriggerStrategy.TIME_ELAPSED);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(inlineCfg);){
            HoodieTableMetaClient metaClient = this.createMetaClient(cfg.getBasePath());
            String instantTime2 = writeClient.createNewInstantTime();
            this.createNextDeltaCommit(instantTime2, this.dataGen.generateUpdates(instantTime2, Integer.valueOf(10)), writeClient, metaClient, inlineCfg, false);
        }
        this.metaClient = this.createMetaClient(cfg.getBasePath());
        Assertions.assertEquals((int)4, (int)this.metaClient.getActiveTimeline().getWriteTimeline().countInstants());
        Assertions.assertEquals((Object)instantTime, (Object)((HoodieInstant)this.metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get()).requestedTime());
    }

    @Test
    public void testCompactionRetryOnFailureBasedOnNumAndTime() throws Exception {
        String instantTime;
        HoodieWriteConfig cfg = this.getConfigBuilder(false).withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(Boolean.valueOf(false)).withMaxDeltaSecondsBeforeCompaction(1).withMaxNumDeltaCommitsBeforeCompaction(1).withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.NUM_AND_TIME).build()).build();
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(cfg);){
            List instants = IntStream.range(0, 2).mapToObj(i -> writeClient.createNewInstantTime()).collect(Collectors.toList());
            List records = this.dataGen.generateInserts((String)instants.get(0), Integer.valueOf(10));
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            this.runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList());
            instantTime = writeClient.createNewInstantTime();
            this.scheduleCompaction(instantTime, writeClient, cfg);
            this.moveCompactionFromRequestedToInflight(instantTime, cfg);
        }
        HoodieWriteConfig inlineCfg = this.getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_OR_TIME);
        try (SparkRDDWriteClient writeClient = this.getHoodieWriteClient(inlineCfg);){
            HoodieTableMetaClient metaClient = this.createMetaClient(cfg.getBasePath());
            String instantTime2 = writeClient.createNewInstantTime();
            this.createNextDeltaCommit(instantTime2, this.dataGen.generateUpdates(instantTime2, Integer.valueOf(10)), writeClient, metaClient, inlineCfg, false);
        }
        this.metaClient = this.createMetaClient(cfg.getBasePath());
        Assertions.assertEquals((int)4, (int)this.metaClient.getActiveTimeline().getWriteTimeline().countInstants());
        Assertions.assertEquals((Object)instantTime, (Object)((HoodieInstant)this.metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get()).requestedTime());
    }
}

