/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.compact;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactionTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestAsyncCompaction
extends CompactionTestBase {
    private HoodieWriteConfig getConfig(Boolean autoCommit) {
        return this.getConfigBuilder(autoCommit).build();
    }

    @Test
    public void testRollbackForInflightCompaction() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(false);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            String firstInstantTime = "001";
            String secondInstantTime = "004";
            String compactionInstantTime = "005";
            int numRecs = 2000;
            List records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
            this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<String>());
            this.scheduleCompaction(compactionInstantTime, client, cfg);
            HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.storageConf).setBasePath(cfg.getBasePath()).build();
            HoodieInstant pendingCompactionInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
            Assertions.assertEquals((Object)compactionInstantTime, (Object)pendingCompactionInstant.getTimestamp(), (String)"Pending Compaction instant has expected instant time");
            Assertions.assertEquals((Object)HoodieInstant.State.REQUESTED, (Object)pendingCompactionInstant.getState(), (String)"Pending Compaction instant has expected state");
            this.moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
            metaClient = HoodieTableMetaClient.builder().setConf(this.storageConf).setBasePath(cfg.getBasePath()).build();
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context, (HoodieTableMetaClient)metaClient);
            hoodieTable.rollbackInflightCompaction(new HoodieInstant(HoodieInstant.State.INFLIGHT, "compaction", compactionInstantTime));
            metaClient = HoodieTableMetaClient.builder().setConf(this.storageConf).setBasePath(cfg.getBasePath()).build();
            pendingCompactionInstant = (HoodieInstant)metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline().getInstantsAsStream().findFirst().get();
            Assertions.assertEquals((Object)"compaction", (Object)pendingCompactionInstant.getAction());
            Assertions.assertEquals((Object)HoodieInstant.State.REQUESTED, (Object)pendingCompactionInstant.getState());
            Assertions.assertEquals((Object)compactionInstantTime, (Object)pendingCompactionInstant.getTimestamp());
            StoragePathInfo pathInfo = metaClient.getStorage().getPathInfo(new StoragePath(metaClient.getMetaPath(), pendingCompactionInstant.getFileName()));
            Assertions.assertTrue((pathInfo.getLength() > 0L ? 1 : 0) != 0);
        }
    }

    @Test
    public void testRollbackInflightIngestionWithPendingCompaction() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(false);
        String firstInstantTime = "001";
        String secondInstantTime = "004";
        String compactionInstantTime = "005";
        String inflightInstantTime = "006";
        String nextInflightInstantTime = "007";
        int numRecs = 2000;
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            List<HoodieRecord> records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
            records = this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<String>());
            this.scheduleCompaction(compactionInstantTime, client, cfg);
            HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            this.createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true);
            metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            HoodieInstant pendingCompactionInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
            Assertions.assertEquals((Object)compactionInstantTime, (Object)pendingCompactionInstant.getTimestamp(), (String)"Pending Compaction instant has expected instant time");
            HoodieInstant inflightInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
            Assertions.assertEquals((Object)inflightInstantTime, (Object)inflightInstant.getTimestamp(), (String)"inflight instant has expected instant time");
            client.startCommitWithTime(nextInflightInstantTime);
            metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            inflightInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
            Assertions.assertEquals((Object)inflightInstant.getTimestamp(), (Object)nextInflightInstantTime, (String)"inflight instant has expected instant time");
            Assertions.assertEquals((int)1, (int)metaClient.getActiveTimeline().filterPendingExcludingCompaction().countInstants(), (String)"Expect only one inflight instant");
            pendingCompactionInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
            Assertions.assertEquals((Object)compactionInstantTime, (Object)pendingCompactionInstant.getTimestamp(), (String)"Pending Compaction instant has expected instant time");
        }
    }

    @Test
    public void testInflightCompaction() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(true);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            String firstInstantTime = "001";
            String secondInstantTime = "004";
            String compactionInstantTime = "005";
            String thirdInstantTime = "006";
            String fourthInstantTime = "007";
            int numRecs = 2000;
            List<HoodieRecord> records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
            records = this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<String>());
            HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            HoodieSparkTable hoodieTable = this.getHoodieTable(metaClient, cfg);
            this.scheduleCompaction(compactionInstantTime, client, cfg);
            this.moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
            this.runNextDeltaCommits(client, readClient, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false, Arrays.asList(compactionInstantTime));
            this.executeCompaction(compactionInstantTime, client, (HoodieTable)hoodieTable, cfg, numRecs, true);
        }
    }

    @Test
    public void testScheduleIngestionBeforePendingCompaction() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(false);
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
        String firstInstantTime = "001";
        String secondInstantTime = "004";
        String failedInstantTime = "005";
        String compactionInstantTime = "006";
        int numRecs = 2000;
        List initialRecords = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
        List<HoodieRecord> records = this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), initialRecords, cfg, true, new ArrayList<String>());
        String compactInstantTime = HoodieActiveTimeline.createNewInstantTime();
        this.scheduleCompaction(compactInstantTime, client, cfg);
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
        HoodieInstant pendingCompactionInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
        Assertions.assertEquals((Object)compactInstantTime, (Object)pendingCompactionInstant.getTimestamp(), (String)"Pending Compaction instant has expected instant time");
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.runNextDeltaCommits(client, readClient, Collections.singletonList(failedInstantTime), records, cfg, false, Collections.singletonList(compactInstantTime)), (String)"Latest pending compaction instant time can be earlier than this instant time");
    }

    @Test
    public void testScheduleCompactionAfterPendingIngestion() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(false);
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
        String firstInstantTime = "001";
        String secondInstantTime = "004";
        String inflightInstantTime = "005";
        String compactionInstantTime = "006";
        int numRecs = 2000;
        List<HoodieRecord> records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
        records = this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<String>());
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
        this.createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true);
        metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
        HoodieInstant inflightInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
        Assertions.assertEquals((Object)inflightInstantTime, (Object)inflightInstant.getTimestamp(), (String)"inflight instant has expected instant time");
        client = this.getHoodieWriteClient(cfg);
        client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
        metaClient = HoodieTableMetaClient.builder().setConf(this.storageConf).setBasePath(cfg.getBasePath()).build();
        Assertions.assertFalse((boolean)metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().isPresent());
    }

    @Test
    public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(false);
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
        String firstInstantTime = "001";
        String secondInstantTime = "004";
        String compactionInstantTime = "002";
        int numRecs = 2000;
        List records = this.dataGen.generateInserts("001", Integer.valueOf(numRecs));
        this.runNextDeltaCommits(client, readClient, Arrays.asList("001", "004"), records, cfg, true, new ArrayList<String>());
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.scheduleCompaction("002", client, cfg), (String)"Compaction Instant to be scheduled cannot have older timestamp");
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.scheduleCompaction("004", client, cfg), (String)"Compaction Instant to be scheduled cannot have same timestamp as committed instant");
        String compactionInstantTime2 = "006";
        this.scheduleCompaction("006", client, cfg);
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.scheduleCompaction("004", client, cfg), (String)"Compaction Instant to be scheduled cannot have same timestamp as a pending compaction");
    }

    @Test
    public void testCompactionAfterTwoDeltaCommits() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(true);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            String firstInstantTime = "001";
            String secondInstantTime = "004";
            String compactionInstantTime = "005";
            int numRecs = 2000;
            List records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
            this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<String>());
            HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            HoodieSparkTable hoodieTable = this.getHoodieTable(metaClient, cfg);
            this.scheduleAndExecuteCompaction(compactionInstantTime, client, (HoodieTable)hoodieTable, cfg, numRecs, false);
        }
    }

    @Test
    public void testInterleavedCompaction() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(true);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            String firstInstantTime = "001";
            String secondInstantTime = "004";
            String compactionInstantTime = "005";
            String thirdInstantTime = "006";
            String fourthInstantTime = "007";
            int numRecs = 2000;
            List<HoodieRecord> records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
            records = this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<String>());
            HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            HoodieSparkTable hoodieTable = this.getHoodieTable(metaClient, cfg);
            this.scheduleCompaction(compactionInstantTime, client, cfg);
            this.runNextDeltaCommits(client, readClient, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false, Arrays.asList(compactionInstantTime));
            this.executeCompaction(compactionInstantTime, client, (HoodieTable)hoodieTable, cfg, numRecs, true);
        }
    }

    @Test
    public void testCompactionOnReplacedFiles() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(true);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            String firstInstantTime = "001";
            String secondInstantTime = "004";
            String compactionInstantTime = "005";
            String replaceInstantTime = "006";
            int numRecs = 2000;
            List records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
            this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<String>());
            HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            HoodieSparkTable hoodieTable = this.getHoodieTable(metaClient, cfg);
            this.scheduleCompaction(compactionInstantTime, client, cfg);
            metaClient.reloadActiveTimeline();
            HoodieInstant pendingCompactionInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
            Assertions.assertEquals((Object)compactionInstantTime, (Object)pendingCompactionInstant.getTimestamp(), (String)"Pending Compaction instant has expected instant time");
            Set<HoodieFileGroupId> fileGroupsBeforeReplace = this.getAllFileGroups((HoodieTable)hoodieTable, this.dataGen.getPartitionPaths());
            JavaRDD replaceRecords = this.jsc.parallelize(this.dataGen.generateInserts(replaceInstantTime, Integer.valueOf(numRecs)), 1);
            client.startCommitWithTime(replaceInstantTime, "replacecommit");
            client.insertOverwrite(replaceRecords, replaceInstantTime);
            metaClient.reloadActiveTimeline();
            hoodieTable = this.getHoodieTable(metaClient, cfg);
            Set<HoodieFileGroupId> newFileGroups = this.getAllFileGroups((HoodieTable)hoodieTable, this.dataGen.getPartitionPaths());
            Assertions.assertEquals((long)0L, (long)newFileGroups.stream().filter(fg -> fileGroupsBeforeReplace.contains(fg)).count());
            this.executeCompactionWithReplacedFiles(compactionInstantTime, client, (HoodieTable)hoodieTable, cfg, this.dataGen.getPartitionPaths(), fileGroupsBeforeReplace);
        }
    }

    private Set<HoodieFileGroupId> getAllFileGroups(HoodieTable table, String[] partitions) {
        return Arrays.stream(partitions).flatMap(partition -> table.getSliceView().getLatestFileSlices(partition).map(fg -> fg.getFileGroupId())).collect(Collectors.toSet());
    }
}

