/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.clean;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.TimelineLayout;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.TestCleaner;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utils.HoodieWriterClientTestHarness;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestCleanerInsertAndCleanByCommits
extends SparkClientFunctionalTestHarness {
    private static final Logger LOG = LoggerFactory.getLogger(TestCleanerInsertAndCleanByCommits.class);
    private static final int BATCH_SIZE = 100;
    private static final int PARALLELISM = 2;

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testInsertAndCleanByCommits(boolean isAsync) throws Exception {
        this.testInsertAndCleanByCommits((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insert), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false, isAsync);
    }

    @Test
    public void testInsertPreppedAndCleanByCommits() throws Exception {
        this.testInsertAndCleanByCommits((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::insertPreppedRecords), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsertPreppedRecords), true, false);
    }

    @Test
    public void testBulkInsertPreppedAndCleanByCommits() throws Exception {
        this.testInsertAndCleanByCommits((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)(client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty())), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsertPreppedRecords), true, false);
    }

    @Test
    public void testBulkInsertAndCleanByCommits() throws Exception {
        this.testInsertAndCleanByCommits((HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::bulkInsert), (HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String>)((HoodieWriterClientTestHarness.Function3)SparkRDDWriteClient::upsert), false, false);
    }

    private void testInsertAndCleanByCommits(HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn, HoodieWriterClientTestHarness.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI, boolean isAsync) throws Exception {
        int maxCommits = 3;
        HoodieWriteConfig cfg = this.getConfigBuilder(true).withCleanConfig(HoodieCleanConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(Boolean.valueOf(isAsync)).retainCommits(maxCommits).build()).withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).build();
        try (SparkRDDWriteClient client = this.getHoodieWriteClient(cfg);){
            HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction;
            HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(System.nanoTime());
            HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> function2 = isPreppedAPI ? HoodieClientTestBase.wrapRecordsGenFunctionForPreppedCalls(this.basePath(), this.storageConf(), this.context(), cfg, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGen).generateInserts(arg_0, arg_1))) : (recordInsertGenWrappedFunction = (arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGen).generateInserts(arg_0, arg_1));
            HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction = isPreppedAPI ? HoodieClientTestBase.wrapRecordsGenFunctionForPreppedCalls(this.basePath(), this.storageConf(), this.context(), cfg, (HoodieWriterClientTestHarness.Function2<List<HoodieRecord>, String, Integer>)((HoodieWriterClientTestHarness.Function2)(arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGen).generateUniqueUpdates(arg_0, arg_1))) : (arg_0, arg_1) -> ((HoodieTestDataGenerator)dataGen).generateUniqueUpdates(arg_0, arg_1);
            HoodieTableMetaClient metaClient = this.getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
            TestCleaner.insertFirstBigBatchForClientCleanerTest(this.context(), metaClient, client, recordInsertGenWrappedFunction, insertFn);
            HashMap<String, List<HoodieWriteStat>> commitWriteStatsMap = new HashMap<String, List<HoodieWriteStat>>();
            for (int i = 0; i < 8; ++i) {
                String newCommitTime = HoodieTestTable.makeNewCommitTime();
                client.startCommitWithTime(newCommitTime);
                List records = (List)recordUpsertGenWrappedFunction.apply((Object)newCommitTime, (Object)100);
                List statuses = ((JavaRDD)upsertFn.apply((Object)client, (Object)this.jsc().parallelize(records, 2), (Object)newCommitTime)).collect();
                Assertions.assertNoWriteErrors((List)statuses);
                commitWriteStatsMap.put(newCommitTime, statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));
                metaClient = HoodieTableMetaClient.reload((HoodieTableMetaClient)metaClient);
                this.validateFilesAfterCleaning((HoodieTable)HoodieSparkTable.create((HoodieWriteConfig)cfg, (HoodieEngineContext)this.context(), (HoodieTableMetaClient)metaClient), commitWriteStatsMap, dataGen.getPartitionPaths());
            }
        }
    }

    private void validateFilesAfterCleaning(HoodieTable table, Map<String, List<HoodieWriteStat>> commitWriteStatsMap, String[] partitionPaths) {
        org.junit.jupiter.api.Assertions.assertEquals((Object)HoodieCleaningPolicy.KEEP_LATEST_COMMITS, (Object)table.getConfig().getCleanerPolicy());
        boolean isAsyncClean = table.getConfig().isAsyncClean();
        int maxCommitsToRetain = table.getConfig().getCleanerCommitsRetained();
        HoodieTimeline commitsTimeline = table.getCompletedCommitsTimeline();
        HoodieInstant lastInstant = (HoodieInstant)commitsTimeline.lastInstant().get();
        if (isAsyncClean) {
            commitsTimeline = commitsTimeline.findInstantsBefore(lastInstant.requestedTime());
        }
        Option earliestRetainedCommit = commitsTimeline.nthFromLastInstant(maxCommitsToRetain - 1);
        HoodieTimeline timeline = commitsTimeline;
        HashMap expectedInstantTimeMap = new HashMap();
        TableFileSystemView fsView = table.getFileSystemView();
        HashSet remainingFileGroupSet = new HashSet();
        for (String string : partitionPaths) {
            remainingFileGroupSet.addAll(fsView.getAllFileGroups(string).map(fileGroup -> Pair.of((Object)string, (Object)fileGroup.getFileGroupId().getFileId())).collect(Collectors.toList()));
        }
        for (HoodieInstant instant : commitsTimeline.getReverseOrderedInstants().collect(Collectors.toList())) {
            TimelineLayout layout = TimelineLayout.fromVersion((TimelineLayoutVersion)commitsTimeline.getTimelineLayoutVersion());
            List list = commitWriteStatsMap.computeIfAbsent(instant.requestedTime(), newInstant -> {
                try {
                    HoodieInstant instant1 = (HoodieInstant)timeline.filter(inst -> inst.requestedTime().equals(newInstant)).firstInstant().get();
                    return ((HoodieCommitMetadata)layout.getCommitMetadataSerDe().deserialize(instant1, (byte[])timeline.getInstantDetails(instant1).get(), HoodieCommitMetadata.class)).getWriteStats();
                }
                catch (IOException e) {
                    return Collections.EMPTY_LIST;
                }
            });
            list.forEach(writeStat -> {
                Pair partitionFileIdPair = Pair.of((Object)writeStat.getPartitionPath(), (Object)writeStat.getFileId());
                if (remainingFileGroupSet.contains(partitionFileIdPair)) {
                    if (earliestRetainedCommit.isPresent() && InstantComparison.compareTimestamps((String)instant.requestedTime(), (BiPredicate)InstantComparison.LESSER_THAN, (String)((HoodieInstant)earliestRetainedCommit.get()).requestedTime())) {
                        remainingFileGroupSet.remove(partitionFileIdPair);
                    }
                    expectedInstantTimeMap.computeIfAbsent(partitionFileIdPair, k -> new HashSet()).add(instant.requestedTime());
                }
            });
            if (!remainingFileGroupSet.isEmpty()) continue;
            break;
        }
        for (String string : partitionPaths) {
            List fileGroups = fsView.getAllFileGroups(string).collect(Collectors.toList());
            for (HoodieFileGroup fileGroup2 : fileGroups) {
                HashSet commitTimes = new HashSet();
                fileGroup2.getAllBaseFiles().forEach(value -> {
                    LOG.debug("Data File - " + value);
                    commitTimes.add(value.getCommitTime());
                });
                if (isAsyncClean) {
                    commitTimes.remove(lastInstant.requestedTime());
                }
                org.junit.jupiter.api.Assertions.assertEquals(expectedInstantTimeMap.get(Pair.of((Object)string, (Object)fileGroup2.getFileGroupId().getFileId())), commitTimes, (String)"Only contain acceptable versions of file should be present");
            }
        }
    }
}

