/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.CompactionAdminClient;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.CompactionTestUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.table.action.compact.OperationResult;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestCompactionAdminClient
extends HoodieClientTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(TestCompactionAdminClient.class);
    private HoodieTableMetaClient metaClient;
    private CompactionAdminClient client;

    @Override
    @BeforeEach
    public void setUp() throws Exception {
        this.initPath();
        this.initSparkContexts();
        this.metaClient = HoodieTestUtils.init((StorageConfiguration)HoodieTestUtils.getDefaultStorageConf(), (String)this.basePath, (HoodieTableType)HoodieTableType.MERGE_ON_READ);
        this.client = new CompactionAdminClient((HoodieEngineContext)this.context, this.basePath);
    }

    @AfterEach
    public void cleanUp() throws Exception {
        this.cleanupResources();
    }

    @Test
    public void testUnscheduleCompactionPlan() throws Exception {
        int numEntriesPerInstant = 10;
        CompactionTestUtils.setupAndValidateCompactionOperations((HoodieTableMetaClient)this.metaClient, (boolean)false, (int)numEntriesPerInstant, (int)numEntriesPerInstant, (int)numEntriesPerInstant, (int)numEntriesPerInstant);
        this.validateUnSchedulePlan(this.client, "000", "001", numEntriesPerInstant, 2 * numEntriesPerInstant);
        this.validateUnSchedulePlan(this.client, "002", "003", numEntriesPerInstant, 2 * numEntriesPerInstant);
        this.validateUnSchedulePlan(this.client, "004", "005", numEntriesPerInstant, 0);
        this.validateUnSchedulePlan(this.client, "006", "007", numEntriesPerInstant, 0);
    }

    @Test
    public void testUnscheduleCompactionFileId() throws Exception {
        int numEntriesPerInstant = 10;
        CompactionTestUtils.setupAndValidateCompactionOperations((HoodieTableMetaClient)this.metaClient, (boolean)false, (int)numEntriesPerInstant, (int)numEntriesPerInstant, (int)numEntriesPerInstant, (int)numEntriesPerInstant);
        Map<String, CompactionOperation> instantsWithOp = Stream.of("001", "003", "005", "007").map(instant -> {
            try {
                return Pair.of((Object)instant, (Object)CompactionUtils.getCompactionPlan((HoodieTableMetaClient)this.metaClient, (String)instant));
            }
            catch (Exception ioe) {
                throw new HoodieException((Throwable)ioe);
            }
        }).map(instantWithPlan -> ((HoodieCompactionPlan)instantWithPlan.getRight()).getOperations().stream().map(op -> Pair.of((Object)instantWithPlan.getLeft(), (Object)CompactionOperation.convertFromAvroRecordInstance((HoodieCompactionOperation)op))).findFirst().get()).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
        this.validateUnScheduleFileId(this.client, "000", "001", instantsWithOp.get("001"), 2);
        this.validateUnScheduleFileId(this.client, "002", "003", instantsWithOp.get("003"), 2);
        this.validateUnScheduleFileId(this.client, "004", "005", instantsWithOp.get("005"), 0);
        this.validateUnScheduleFileId(this.client, "006", "007", instantsWithOp.get("007"), 0);
    }

    @Test
    public void testRepairCompactionPlan() throws Exception {
        int numEntriesPerInstant = 10;
        CompactionTestUtils.setupAndValidateCompactionOperations((HoodieTableMetaClient)this.metaClient, (boolean)false, (int)numEntriesPerInstant, (int)numEntriesPerInstant, (int)numEntriesPerInstant, (int)numEntriesPerInstant);
        this.validateRepair("000", "001", numEntriesPerInstant, 2 * numEntriesPerInstant);
        this.validateRepair("002", "003", numEntriesPerInstant, 2 * numEntriesPerInstant);
        this.validateRepair("004", "005", numEntriesPerInstant, 0);
        this.validateRepair("006", "007", numEntriesPerInstant, 0);
    }

    private void validateRepair(String ingestionInstant, String compactionInstant, int numEntriesPerInstant, int expNumRepairs) throws Exception {
        List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles = this.validateUnSchedulePlan(this.client, ingestionInstant, compactionInstant, numEntriesPerInstant, expNumRepairs, true);
        this.metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.metaClient.getStorageConf(), (String)this.basePath);
        List result = this.client.validateCompactionPlan(this.metaClient, compactionInstant, 1);
        if (expNumRepairs > 0) {
            Assertions.assertTrue((boolean)result.stream().anyMatch(r -> !r.isSuccess()), (String)"Expect some failures in validation");
        }
        List undoFiles = result.stream().flatMap(r -> CompactionAdminClient.getRenamingActionsToAlignWithCompactionOperation((HoodieTableMetaClient)this.metaClient, (String)compactionInstant, (CompactionOperation)((CompactionOperation)r.getOperation()), (Option)Option.empty()).stream()).map(rn -> {
            try {
                CompactionAdminClient.renameLogFile((HoodieTableMetaClient)this.metaClient, (HoodieLogFile)((HoodieLogFile)rn.getKey()), (HoodieLogFile)((HoodieLogFile)rn.getValue()));
            }
            catch (IOException e) {
                throw new HoodieIOException(e.getMessage(), e);
            }
            return rn;
        }).collect(Collectors.toList());
        Map<String, String> renameFilesFromUndo = undoFiles.stream().collect(Collectors.toMap(p -> ((HoodieLogFile)p.getRight()).getPath().toString(), x -> ((HoodieLogFile)x.getLeft()).getPath().toString()));
        Map<String, String> expRenameFiles = renameFiles.stream().collect(Collectors.toMap(p -> ((HoodieLogFile)p.getLeft()).getPath().toString(), x -> ((HoodieLogFile)x.getRight()).getPath().toString()));
        if (expNumRepairs > 0) {
            Assertions.assertFalse((boolean)renameFiles.isEmpty(), (String)"Rename Files must be non-empty");
        } else {
            Assertions.assertTrue((boolean)renameFiles.isEmpty(), (String)"Rename Files must be empty");
        }
        expRenameFiles.forEach((key, value) -> LOG.info("Key :" + key + " renamed to " + value + " rolled back to " + (String)renameFilesFromUndo.get(key)));
        Assertions.assertEquals(expRenameFiles, renameFilesFromUndo, (String)"Undo must completely rollback renamed files");
        result = this.client.validateCompactionPlan(this.metaClient, compactionInstant, 1);
        Assertions.assertTrue((boolean)result.stream().allMatch(OperationResult::isSuccess), (String)"Expect no failures in validation");
        Assertions.assertEquals((int)expNumRepairs, (int)undoFiles.size(), (String)"Expected Num Repairs");
    }

    private void ensureValidCompactionPlan(String compactionInstant) throws Exception {
        this.metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.metaClient.getStorageConf(), (String)this.basePath);
        List validationResults = this.client.validateCompactionPlan(this.metaClient, compactionInstant, 1);
        Assertions.assertFalse((boolean)validationResults.stream().anyMatch(v -> !v.isSuccess()), (String)"Some validations failed");
    }

    private void validateRenameFiles(List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles, String ingestionInstant, String compactionInstant, HoodieTableFileSystemView fsView) {
        HashSet uniqNewLogFiles = new HashSet();
        HashSet uniqOldLogFiles = new HashSet();
        renameFiles.forEach(lfPair -> {
            Assertions.assertFalse((boolean)uniqOldLogFiles.contains(lfPair.getKey()), (String)"Old Log File Names do not collide");
            Assertions.assertFalse((boolean)uniqNewLogFiles.contains(lfPair.getValue()), (String)"New Log File Names do not collide");
            uniqOldLogFiles.add(lfPair.getKey());
            uniqNewLogFiles.add(lfPair.getValue());
        });
        renameFiles.forEach(lfPair -> {
            HoodieLogFile oldLogFile = (HoodieLogFile)lfPair.getLeft();
            HoodieLogFile newLogFile = (HoodieLogFile)lfPair.getValue();
            Assertions.assertEquals((Object)ingestionInstant, (Object)newLogFile.getBaseCommitTime(), (String)"Base Commit time of ingestion instant is expected");
            Assertions.assertEquals((Object)compactionInstant, (Object)oldLogFile.getBaseCommitTime(), (String)"Base Commit time of compaction instant is expected");
            Assertions.assertEquals((Object)oldLogFile.getFileId(), (Object)newLogFile.getFileId(), (String)"File Id is expected");
            HoodieLogFile lastLogFileBeforeCompaction = fsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], ingestionInstant).filter(fs -> fs.getFileId().equals(oldLogFile.getFileId())).map(fs -> (HoodieLogFile)fs.getLogFiles().findFirst().get()).findFirst().get();
            Assertions.assertEquals((int)(lastLogFileBeforeCompaction.getLogVersion() + oldLogFile.getLogVersion()), (int)newLogFile.getLogVersion(), (String)"Log Version expected");
            Assertions.assertTrue((newLogFile.getLogVersion() > lastLogFileBeforeCompaction.getLogVersion() ? 1 : 0) != 0, (String)"Log version does not collide");
        });
    }

    private List<Pair<HoodieLogFile, HoodieLogFile>> validateUnSchedulePlan(CompactionAdminClient client, String ingestionInstant, String compactionInstant, int numEntriesPerInstant, int expNumRenames) throws Exception {
        return this.validateUnSchedulePlan(client, ingestionInstant, compactionInstant, numEntriesPerInstant, expNumRenames, false);
    }

    private List<Pair<HoodieLogFile, HoodieLogFile>> validateUnSchedulePlan(CompactionAdminClient client, String ingestionInstant, String compactionInstant, int numEntriesPerInstant, int expNumRenames, boolean skipUnSchedule) throws Exception {
        this.ensureValidCompactionPlan(compactionInstant);
        List renameFiles = client.getRenamingActionsForUnschedulingCompactionPlan(this.metaClient, compactionInstant, 1, Option.empty(), false);
        this.metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.metaClient.getStorageConf(), (String)this.basePath);
        Set gotLogFilesToBeRenamed = renameFiles.stream().map(Pair::getLeft).collect(Collectors.toSet());
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsAndCompactionTimeline());
        Set expLogFilesToBeRenamed = fsView.getLatestFileSlices(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]).filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).flatMap(FileSlice::getLogFiles).collect(Collectors.toSet());
        Assertions.assertEquals(expLogFilesToBeRenamed, gotLogFilesToBeRenamed, (String)"Log files belonging to file-slices created because of compaction request must be renamed");
        if (skipUnSchedule) {
            renameFiles.forEach(lfPair -> {
                try {
                    CompactionAdminClient.renameLogFile((HoodieTableMetaClient)this.metaClient, (HoodieLogFile)((HoodieLogFile)lfPair.getLeft()), (HoodieLogFile)((HoodieLogFile)lfPair.getRight()));
                }
                catch (IOException e) {
                    throw new HoodieIOException(e.getMessage(), e);
                }
            });
        } else {
            this.validateRenameFiles(renameFiles, ingestionInstant, compactionInstant, fsView);
        }
        Map<String, Long> fileIdToCountsBeforeRenaming = fsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant).filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant)).map(fs -> Pair.of((Object)fs.getFileId(), (Object)fs.getLogFiles().count())).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        client.unscheduleCompactionPlan(compactionInstant, false, 1, false);
        this.metaClient = HoodieTableMetaClient.builder().setConf(this.metaClient.getStorageConf()).setBasePath(this.basePath).setLoadActiveTimelineOnLoad(true).build();
        HoodieTableFileSystemView newFsView = new HoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsAndCompactionTimeline());
        newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true).filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> {
            Assertions.assertFalse((boolean)fs.getBaseFile().isPresent(), (String)"No Data file must be present");
            Assertions.assertEquals((long)0L, (long)fs.getLogFiles().count(), (String)"No Log Files");
        });
        Map<String, Long> fileIdToCountsAfterRenaming = newFsView.getAllFileGroups(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]).flatMap(HoodieFileGroup::getAllFileSlices).filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant)).map(fs -> Pair.of((Object)fs.getFileId(), (Object)fs.getLogFiles().count())).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        Assertions.assertEquals(fileIdToCountsBeforeRenaming, fileIdToCountsAfterRenaming, (String)"Each File Id has same number of log-files");
        Assertions.assertEquals((int)numEntriesPerInstant, (int)fileIdToCountsAfterRenaming.size(), (String)"Not Empty");
        Assertions.assertEquals((int)expNumRenames, (int)renameFiles.size(), (String)"Expected number of renamed files");
        return renameFiles;
    }

    private void validateUnScheduleFileId(CompactionAdminClient client, String ingestionInstant, String compactionInstant, CompactionOperation op, int expNumRenames) throws Exception {
        this.ensureValidCompactionPlan(compactionInstant);
        List renameFiles = client.getRenamingActionsForUnschedulingCompactionOperation(this.metaClient, compactionInstant, op, Option.empty(), false);
        this.metaClient = HoodieTableMetaClient.builder().setConf(this.metaClient.getStorageConf()).setBasePath(this.basePath).setLoadActiveTimelineOnLoad(true).build();
        Set gotLogFilesToBeRenamed = renameFiles.stream().map(Pair::getLeft).collect(Collectors.toSet());
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsAndCompactionTimeline());
        Set expLogFilesToBeRenamed = fsView.getLatestFileSlices(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]).filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).filter(fs -> fs.getFileId().equals(op.getFileId())).flatMap(FileSlice::getLogFiles).collect(Collectors.toSet());
        Assertions.assertEquals(expLogFilesToBeRenamed, gotLogFilesToBeRenamed, (String)"Log files belonging to file-slices created because of compaction request must be renamed");
        this.validateRenameFiles(renameFiles, ingestionInstant, compactionInstant, fsView);
        Map<String, Long> fileIdToCountsBeforeRenaming = fsView.getLatestMergedFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant).filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant)).filter(fs -> fs.getFileId().equals(op.getFileId())).map(fs -> Pair.of((Object)fs.getFileId(), (Object)fs.getLogFiles().count())).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        client.unscheduleCompactionFileId(op.getFileGroupId(), false, false);
        this.metaClient = HoodieTestUtils.createMetaClient((StorageConfiguration)this.metaClient.getStorageConf(), (String)this.basePath);
        HoodieTableFileSystemView newFsView = new HoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsAndCompactionTimeline());
        newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true).filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).filter(fs -> fs.getFileId().equals(op.getFileId())).forEach(fs -> {
            Assertions.assertFalse((boolean)fs.getBaseFile().isPresent(), (String)"No Data file must be present");
            Assertions.assertEquals((long)0L, (long)fs.getLogFiles().count(), (String)"No Log Files");
        });
        Map<String, Long> fileIdToCountsAfterRenaming = newFsView.getAllFileGroups(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0]).flatMap(HoodieFileGroup::getAllFileSlices).filter(fs -> fs.getBaseInstantTime().equals(ingestionInstant)).filter(fs -> fs.getFileId().equals(op.getFileId())).map(fs -> Pair.of((Object)fs.getFileId(), (Object)fs.getLogFiles().count())).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        Assertions.assertEquals(fileIdToCountsBeforeRenaming, fileIdToCountsAfterRenaming, (String)"Each File Id has same number of log-files");
        Assertions.assertEquals((int)1, (int)fileIdToCountsAfterRenaming.size(), (String)"Not Empty");
        Assertions.assertEquals((int)expNumRenames, (int)renameFiles.size(), (String)"Expected number of renamed files");
    }
}

