/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.clean;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.TestCleaner;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utils.HoodieWriterClientTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Test;

public class TestCleanerInsertAndCleanByVersions
extends SparkClientFunctionalTestHarness {
    private static final int BATCH_SIZE = 100;
    private static final int PARALLELISM = 2;

    @Test
    public void testInsertAndCleanByVersions() throws Exception {
        this.testInsertAndCleanByVersions((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false);
    }

    @Test
    public void testInsertPreppedAndCleanByVersions() throws Exception {
        this.testInsertAndCleanByVersions((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insertPreppedRecords), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsertPreppedRecords), true);
    }

    @Test
    public void testBulkInsertAndCleanByVersions() throws Exception {
        this.testInsertAndCleanByVersions((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false);
    }

    @Test
    public void testBulkInsertPreppedAndCleanByVersions() throws Exception {
        this.testInsertAndCleanByVersions((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)(client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty())), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsertPreppedRecords), true);
    }

    private void testInsertAndCleanByVersions(HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn, HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI) throws Exception {
        int maxVersions = 2;
        HoodieWriteConfig cfg = this.getConfigBuilder(true).withCleanConfig(HoodieCleanConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build()).withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).build();
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction;
            HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(System.nanoTime());
            HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> function2 = isPreppedAPI ? HoodieClientTestBase.wrapRecordsGenFunctionForPreppedCalls(this.basePath(), this.storageConf(), this.context(), cfg, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGen).generateInserts(arg_0, arg_1))) : (recordInsertGenWrappedFunction = (arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGen).generateInserts(arg_0, arg_1));
            HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction = isPreppedAPI ? HoodieClientTestBase.wrapRecordsGenFunctionForPreppedCalls(this.basePath(), this.storageConf(), this.context(), cfg, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGen).generateUniqueUpdates(arg_0, arg_1))) : (arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGen).generateUniqueUpdates(arg_0, arg_1);
            HoodieTableMetaClient metaClient = this.getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
            TestCleaner.insertFirstBigBatchForClientCleanerTest(this.context(), metaClient, client, recordInsertGenWrappedFunction, insertFn);
            HashMap compactionFileIdToLatestFileSlice = new HashMap();
            metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
            HoodieSparkTable table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
            for (String partitionPath : dataGen.getPartitionPaths()) {
                TableFileSystemView fsView = table.getFileSystemView();
                Option added = Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst().map(fg -> {
                    fg.getLatestFileSlice().map(fs -> compactionFileIdToLatestFileSlice.put(fg.getFileGroupId(), fs));
                    return true;
                }));
                if (added.isPresent()) break;
            }
            List partitionFileSlicePairs = compactionFileIdToLatestFileSlice.entrySet().stream().map(e -> Pair.of((Object)((HoodieFileGroupId)e.getKey()).getPartitionPath(), e.getValue())).collect(Collectors.toList());
            HoodieCompactionPlan compactionPlan = CompactionUtils.buildFromFileSlices(partitionFileSlicePairs, (Option)Option.empty(), (Option)Option.empty());
            List instantTimes = HoodieTestTable.makeIncrementalCommitTimes((int)9, (int)1, (int)10);
            String compactionTime = (String)instantTimes.get(0);
            table.getActiveTimeline().saveToCompactionRequested(new HoodieInstant(HoodieInstant.State.REQUESTED, "compaction", compactionTime), TimelineMetadataUtils.serializeCompactionPlan((HoodieCompactionPlan)compactionPlan));
            instantTimes = instantTimes.subList(1, instantTimes.size());
            for (String newInstantTime : instantTimes) {
                client.startCommitWithTime(newInstantTime);
                List records = (List)recordUpsertGenWrappedFunction.apply((Object)newInstantTime, (Object)100);
                List statuses = ((JavaRDD)upsertFn.apply((Object)client, (Object)this.jsc().parallelize(records, 2), (Object)newInstantTime)).collect();
                Assertions.assertNoWriteErrors((List)statuses);
                metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
                table = HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient);
                HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();
                TableFileSystemView fsView = table.getFileSystemView();
                for (String partitionPath : dataGen.getPartitionPaths()) {
                    HashMap fileIdToVersions = new HashMap();
                    for (HoodieInstant entry : timeline.getInstants()) {
                        HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])((byte[])timeline.getInstantDetails(entry).get()), HoodieCommitMetadata.class);
                        for (HoodieWriteStat wstat : commitMetadata.getWriteStats(partitionPath)) {
                            if (!fileIdToVersions.containsKey(wstat.getFileId())) {
                                fileIdToVersions.put(wstat.getFileId(), new TreeSet());
                            }
                            ((TreeSet)fileIdToVersions.get(wstat.getFileId())).add(FSUtils.getCommitTime((String)new Path(wstat.getPath()).getName()));
                        }
                    }
                    List fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
                    for (HoodieFileGroup fileGroup : fileGroups) {
                        if (compactionFileIdToLatestFileSlice.containsKey(fileGroup.getFileGroupId())) {
                            Option dataFileForCompactionPresent = Option.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> ((FileSlice)compactionFileIdToLatestFileSlice.get(fileGroup.getFileGroupId())).getBaseInstantTime().equals(df.getCommitTime())).findAny());
                            org.junit.jupiter.api.Assertions.assertTrue((boolean)dataFileForCompactionPresent.isPresent(), (String)"Data File selected for compaction is retained");
                            continue;
                        }
                        String fileId = fileGroup.getFileGroupId().getFileId();
                        List dataFiles = fileGroup.getAllBaseFiles().collect(Collectors.toList());
                        org.junit.jupiter.api.Assertions.assertTrue((dataFiles.size() <= maxVersions ? 1 : 0) != 0, (String)("fileId " + fileId + " has more than " + maxVersions + " versions"));
                        ArrayList commitedVersions = new ArrayList((Collection)fileIdToVersions.get(fileId));
                        for (int i = 0; i < dataFiles.size(); ++i) {
                            org.junit.jupiter.api.Assertions.assertEquals((Object)((HoodieBaseFile)dataFiles.get(i)).getCommitTime(), commitedVersions.get(commitedVersions.size() - 1 - i), (String)("File " + fileId + " does not have latest versions on commits" + commitedVersions));
                        }
                    }
                }
            }
        }
    }
}

