/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.compact;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactionTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestAsyncCompaction
extends CompactionTestBase {
    private HoodieWriteConfig getConfig(Boolean autoCommit) {
        return this.getConfigBuilder(autoCommit).build();
    }

    @Test
    public void testRollbackForInflightCompaction() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(false);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            String firstInstantTime = "001";
            String secondInstantTime = "004";
            String compactionInstantTime = "005";
            int numRecs = 2000;
            List records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
            this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList());
            this.scheduleCompaction(compactionInstantTime, client, cfg);
            HoodieInstant pendingCompactionInstant = (HoodieInstant)this.metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
            Assertions.assertEquals((Object)compactionInstantTime, (Object)pendingCompactionInstant.requestedTime(), (String)"Pending Compaction instant has expected instant time");
            Assertions.assertEquals((Object)HoodieInstant.State.REQUESTED, (Object)pendingCompactionInstant.getState(), (String)"Pending Compaction instant has expected state");
            this.moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
            this.metaClient.reloadActiveTimeline();
            HoodieSparkTable hoodieTable = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context, (HoodieTableMetaClient)this.metaClient);
            hoodieTable.rollbackInflightCompaction(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "compaction", compactionInstantTime));
            this.metaClient.reloadActiveTimeline();
            pendingCompactionInstant = (HoodieInstant)this.metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline().getInstantsAsStream().findFirst().get();
            Assertions.assertEquals((Object)"compaction", (Object)pendingCompactionInstant.getAction());
            Assertions.assertEquals((Object)HoodieInstant.State.REQUESTED, (Object)pendingCompactionInstant.getState());
            Assertions.assertEquals((Object)compactionInstantTime, (Object)pendingCompactionInstant.requestedTime());
            StoragePathInfo pathInfo = this.metaClient.getStorage().getPathInfo(new StoragePath(this.metaClient.getTimelinePath(), HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR.getFileName(pendingCompactionInstant)));
            Assertions.assertTrue((pathInfo.getLength() > 0L ? 1 : 0) != 0);
        }
    }

    @Test
    public void testRollbackInflightIngestionWithPendingCompaction() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(false);
        String firstInstantTime = "001";
        String secondInstantTime = "004";
        String compactionInstantTime = "005";
        String inflightInstantTime = "006";
        String nextInflightInstantTime = "007";
        int numRecs = 2000;
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            List records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
            records = this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList());
            this.scheduleCompaction(compactionInstantTime, client, cfg);
            HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            this.createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true);
            metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            HoodieInstant pendingCompactionInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
            Assertions.assertEquals((Object)compactionInstantTime, (Object)pendingCompactionInstant.requestedTime(), (String)"Pending Compaction instant has expected instant time");
            HoodieInstant inflightInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
            Assertions.assertEquals((Object)inflightInstantTime, (Object)inflightInstant.requestedTime(), (String)"inflight instant has expected instant time");
            client.startCommitWithTime(nextInflightInstantTime);
            metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            inflightInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
            Assertions.assertEquals((Object)inflightInstant.requestedTime(), (Object)nextInflightInstantTime, (String)"inflight instant has expected instant time");
            Assertions.assertEquals((int)1, (int)metaClient.getActiveTimeline().filterPendingExcludingCompaction().countInstants(), (String)"Expect only one inflight instant");
            pendingCompactionInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
            Assertions.assertEquals((Object)compactionInstantTime, (Object)pendingCompactionInstant.requestedTime(), (String)"Pending Compaction instant has expected instant time");
        }
    }

    @Test
    public void testInflightCompaction() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(true);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            String firstInstantTime = "001";
            String secondInstantTime = "004";
            String compactionInstantTime = "005";
            String thirdInstantTime = "006";
            String fourthInstantTime = "007";
            int numRecs = 2000;
            List records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
            records = this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList());
            HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            HoodieSparkTable hoodieTable = this.getHoodieTable(metaClient, cfg);
            this.scheduleCompaction(compactionInstantTime, client, cfg);
            this.moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
            this.runNextDeltaCommits(client, readClient, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false, Arrays.asList(compactionInstantTime));
            this.executeCompaction(compactionInstantTime, client, (HoodieTable)hoodieTable, cfg, numRecs, true);
        }
    }

    @Test
    public void testConcurrentCompaction() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(false);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            String firstInstantTime = "001";
            String pendingInstantTime = "002";
            String secondInstantTime = "004";
            String compactionInstantTime = "005";
            String thirdInstantTime = "006";
            String fourthInstantTime = "007";
            int numRecs = 2000;
            List records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
            records = this.runNextDeltaCommits(client, readClient, Collections.singletonList(firstInstantTime), records, cfg, true, Collections.emptyList());
            this.metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            this.createNextDeltaCommit(pendingInstantTime, this.dataGen.generateUpdates(pendingInstantTime, Integer.valueOf(records.size())), client, this.metaClient, cfg, true);
            this.runNextDeltaCommits(client, readClient, Arrays.asList(secondInstantTime, thirdInstantTime, fourthInstantTime), records, cfg, false, Collections.emptyList());
            HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            HoodieSparkTable hoodieTable = this.getHoodieTable(metaClient, cfg);
            this.scheduleCompaction(compactionInstantTime, client, cfg);
            this.moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
            HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan((byte[])((byte[])metaClient.reloadActiveTimeline().readCompactionPlanAsBytes(HoodieTestUtils.INSTANT_GENERATOR.getCompactionRequestedInstant(compactionInstantTime)).get()));
            Assertions.assertTrue((boolean)compactionPlan.getOperations().stream().noneMatch(op -> op.getDeltaFilePaths().stream().anyMatch(deltaFile -> FSUtils.getCommitTime((String)deltaFile).equals(pendingInstantTime))), (String)("compaction plan should not include pending log files. Data file paths " + new HashSet(compactionPlan.getOperations())));
            this.executeCompaction(compactionInstantTime, client, (HoodieTable)hoodieTable, cfg, numRecs, true);
        }
    }

    @Test
    public void testScheduleIngestionBeforePendingCompaction() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(false);
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
        String firstInstantTime = "001";
        String secondInstantTime = "004";
        String failedInstantTime = "005";
        int numRecs = 2000;
        List initialRecords = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
        List records = this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), initialRecords, cfg, true, new ArrayList());
        String compactionInstantTime = client.createNewInstantTime();
        this.scheduleCompaction(compactionInstantTime, client, cfg);
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
        HoodieInstant pendingCompactionInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
        Assertions.assertEquals((Object)compactionInstantTime, (Object)pendingCompactionInstant.requestedTime(), (String)"Pending Compaction instant has expected instant time");
        Assertions.assertDoesNotThrow(() -> this.runNextDeltaCommits(client, readClient, Collections.singletonList(failedInstantTime), records, cfg, false, Collections.singletonList(compactionInstantTime)), (String)"Latest pending compaction instant time can be earlier than this instant time");
    }

    @Test
    public void testScheduleCompactionAfterPendingIngestion() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(false);
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
        String firstInstantTime = "001";
        String secondInstantTime = "004";
        String inflightInstantTime = "005";
        String compactionInstantTime = "006";
        int numRecs = 2000;
        List records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
        records = this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList());
        HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
        this.createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true);
        metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
        HoodieInstant inflightInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
        Assertions.assertEquals((Object)inflightInstantTime, (Object)inflightInstant.requestedTime(), (String)"inflight instant has expected instant time");
        Assertions.assertDoesNotThrow(() -> this.scheduleCompaction(compactionInstantTime, client, cfg), (String)"Earliest ingestion inflight instant time can be smaller than the compaction time");
    }

    @Test
    public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(false);
        SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);
        SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
        String firstInstantTime = "001";
        String secondInstantTime = "004";
        String compactionInstantTime = "002";
        int numRecs = 2000;
        List records = this.dataGen.generateInserts("001", Integer.valueOf(numRecs));
        this.runNextDeltaCommits(client, readClient, Arrays.asList("001", "004"), records, cfg, true, new ArrayList());
        Assertions.assertNull((Object)this.tryScheduleCompaction("002", client, cfg), (String)"Compaction Instant can be scheduled with older timestamp");
        Assertions.assertNull((Object)this.tryScheduleCompaction("004", client, cfg), (String)"Compaction Instant to be scheduled can have same timestamp as committed instant");
        String compactionInstantTime2 = client.createNewInstantTime();
        Assertions.assertNotNull((Object)this.tryScheduleCompaction(compactionInstantTime2, client, cfg), (String)"Compaction Instant can be scheduled with greater timestamp");
    }

    @Test
    public void testCompactionAfterTwoDeltaCommits() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(true);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            String firstInstantTime = "001";
            String secondInstantTime = "004";
            String compactionInstantTime = "005";
            int numRecs = 2000;
            List records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
            this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList());
            HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            HoodieSparkTable hoodieTable = this.getHoodieTable(metaClient, cfg);
            this.scheduleAndExecuteCompaction(compactionInstantTime, client, (HoodieTable)hoodieTable, cfg, numRecs, false);
        }
    }

    @Test
    public void testInterleavedCompaction() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(true);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            String firstInstantTime = "001";
            String secondInstantTime = "004";
            String compactionInstantTime = "005";
            String thirdInstantTime = "006";
            String fourthInstantTime = "007";
            int numRecs = 2000;
            List records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
            records = this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList());
            HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            HoodieSparkTable hoodieTable = this.getHoodieTable(metaClient, cfg);
            this.scheduleCompaction(compactionInstantTime, client, cfg);
            this.runNextDeltaCommits(client, readClient, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false, Arrays.asList(compactionInstantTime));
            this.executeCompaction(compactionInstantTime, client, (HoodieTable)hoodieTable, cfg, numRecs, true);
        }
    }

    @Test
    public void testCompactionOnReplacedFiles() throws Exception {
        HoodieWriteConfig cfg = this.getConfig(true);
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            SparkRDDReadClient readClient = this.getHoodieReadClient(cfg.getBasePath());
            String firstInstantTime = "001";
            String secondInstantTime = "004";
            String compactionInstantTime = "005";
            String replaceInstantTime = "006";
            int numRecs = 2000;
            List records = this.dataGen.generateInserts(firstInstantTime, Integer.valueOf(numRecs));
            this.runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList());
            HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.storageConf.newInstance(), (String)cfg.getBasePath());
            HoodieSparkTable hoodieTable = this.getHoodieTable(metaClient, cfg);
            this.scheduleCompaction(compactionInstantTime, client, cfg);
            metaClient.reloadActiveTimeline();
            HoodieInstant pendingCompactionInstant = (HoodieInstant)metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
            Assertions.assertEquals((Object)compactionInstantTime, (Object)pendingCompactionInstant.requestedTime(), (String)"Pending Compaction instant has expected instant time");
            Set<HoodieFileGroupId> fileGroupsBeforeReplace = this.getAllFileGroups((HoodieTable)hoodieTable, this.dataGen.getPartitionPaths());
            JavaRDD replaceRecords = this.jsc.parallelize(this.dataGen.generateInserts(replaceInstantTime, Integer.valueOf(numRecs)), 1);
            client.startCommitWithTime(replaceInstantTime, "replacecommit");
            client.insertOverwrite(replaceRecords, replaceInstantTime);
            metaClient.reloadActiveTimeline();
            hoodieTable = this.getHoodieTable(metaClient, cfg);
            Set<HoodieFileGroupId> newFileGroups = this.getAllFileGroups((HoodieTable)hoodieTable, this.dataGen.getPartitionPaths());
            Assertions.assertEquals((long)0L, (long)newFileGroups.stream().filter(fg -> fileGroupsBeforeReplace.contains(fg)).count());
            this.executeCompactionWithReplacedFiles(compactionInstantTime, client, (HoodieTable)hoodieTable, cfg, this.dataGen.getPartitionPaths(), fileGroupsBeforeReplace);
        }
    }

    private Set<HoodieFileGroupId> getAllFileGroups(HoodieTable table, String[] partitions) {
        return Arrays.stream(partitions).flatMap(partition -> table.getSliceView().getLatestFileSlices(partition).map(fg -> fg.getFileGroupId())).collect(Collectors.toSet());
    }
}

