/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hudi.AbstractHoodieWriteClient;
import org.apache.hudi.HoodieCleanClient;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.com.codahale.metrics.Timer;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.TableFileSystemView;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.func.BulkInsertMapFunction;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.HoodieCommitArchiveLog;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;

public class HoodieWriteClient<T extends HoodieRecordPayload>
extends AbstractHoodieWriteClient<T> {
    private static final Logger LOG = LogManager.getLogger(HoodieWriteClient.class);
    private static final String LOOKUP_STR = "lookup";
    private final boolean rollbackPending;
    private final transient HoodieMetrics metrics;
    private final transient HoodieCleanClient<T> cleanClient;
    private transient Timer.Context compactionTimer;

    public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) {
        this(jsc, clientConfig, false);
    }

    public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending) {
        this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig, jsc));
    }

    @VisibleForTesting
    HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) {
        this(jsc, clientConfig, rollbackPending, index, Option.empty());
    }

    public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index, Option<EmbeddedTimelineService> timelineService) {
        super(jsc, index, clientConfig, timelineService);
        this.metrics = new HoodieMetrics(this.config, this.config.getTableName());
        this.rollbackPending = rollbackPending;
        this.cleanClient = new HoodieCleanClient(jsc, this.config, this.metrics, timelineService);
    }

    public static SparkConf registerClasses(SparkConf conf) {
        conf.registerKryoClasses(new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class});
        return conf;
    }

    public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
        HoodieTable table = HoodieTable.getHoodieTable(this.createMetaClient(true), this.config, this.jsc);
        Timer.Context indexTimer = this.metrics.getIndexCtx();
        JavaRDD recordsWithLocation = this.getIndex().tagLocation(hoodieRecords, this.jsc, table);
        this.metrics.updateIndexMetrics(LOOKUP_STR, this.metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
        return recordsWithLocation.filter((Function & Serializable)v1 -> !v1.isCurrentLocationKnown());
    }

    public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String commitTime) {
        HoodieTable table = this.getTableAndInitCtx(AbstractHoodieWriteClient.OperationType.UPSERT);
        try {
            JavaRDD<HoodieRecord<T>> dedupedRecords = this.combineOnCondition(this.config.shouldCombineBeforeUpsert(), records, this.config.getUpsertShuffleParallelism());
            Timer.Context indexTimer = this.metrics.getIndexCtx();
            JavaRDD taggedRecords = this.getIndex().tagLocation(dedupedRecords, this.jsc, table);
            this.metrics.updateIndexMetrics(LOOKUP_STR, this.metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
            return this.upsertRecordsInternal(taggedRecords, commitTime, table, true);
        }
        catch (Throwable e) {
            if (e instanceof HoodieUpsertException) {
                throw (HoodieUpsertException)e;
            }
            throw new HoodieUpsertException("Failed to upsert for commit time " + commitTime, e);
        }
    }

    public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String commitTime) {
        HoodieTable table = this.getTableAndInitCtx(AbstractHoodieWriteClient.OperationType.UPSERT_PREPPED);
        try {
            return this.upsertRecordsInternal(preppedRecords, commitTime, table, true);
        }
        catch (Throwable e) {
            if (e instanceof HoodieUpsertException) {
                throw (HoodieUpsertException)e;
            }
            throw new HoodieUpsertException("Failed to upsert prepared records for commit time " + commitTime, e);
        }
    }

    public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, String commitTime) {
        HoodieTable table = this.getTableAndInitCtx(AbstractHoodieWriteClient.OperationType.INSERT);
        try {
            JavaRDD<HoodieRecord<T>> dedupedRecords = this.combineOnCondition(this.config.shouldCombineBeforeInsert(), records, this.config.getInsertShuffleParallelism());
            return this.upsertRecordsInternal(dedupedRecords, commitTime, table, false);
        }
        catch (Throwable e) {
            if (e instanceof HoodieInsertException) {
                throw e;
            }
            throw new HoodieInsertException("Failed to insert for commit time " + commitTime, e);
        }
    }

    public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String commitTime) {
        HoodieTable table = this.getTableAndInitCtx(AbstractHoodieWriteClient.OperationType.INSERT_PREPPED);
        try {
            return this.upsertRecordsInternal(preppedRecords, commitTime, table, false);
        }
        catch (Throwable e) {
            if (e instanceof HoodieInsertException) {
                throw e;
            }
            throw new HoodieInsertException("Failed to insert prepared records for commit time " + commitTime, e);
        }
    }

    public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, String commitTime) {
        return this.bulkInsert(records, commitTime, Option.empty());
    }

    public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, String commitTime, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
        HoodieTable table = this.getTableAndInitCtx(AbstractHoodieWriteClient.OperationType.BULK_INSERT);
        try {
            JavaRDD<HoodieRecord<T>> dedupedRecords = this.combineOnCondition(this.config.shouldCombineBeforeInsert(), records, this.config.getInsertShuffleParallelism());
            return this.bulkInsertInternal(dedupedRecords, commitTime, table, bulkInsertPartitioner);
        }
        catch (Throwable e) {
            if (e instanceof HoodieInsertException) {
                throw e;
            }
            throw new HoodieInsertException("Failed to bulk insert for commit time " + commitTime, e);
        }
    }

    public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String commitTime, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
        HoodieTable table = this.getTableAndInitCtx(AbstractHoodieWriteClient.OperationType.BULK_INSERT_PREPPED);
        try {
            return this.bulkInsertInternal(preppedRecords, commitTime, table, bulkInsertPartitioner);
        }
        catch (Throwable e) {
            if (e instanceof HoodieInsertException) {
                throw e;
            }
            throw new HoodieInsertException("Failed to bulk insert prepared records for commit time " + commitTime, e);
        }
    }

    public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys2, String commitTime) {
        HoodieTable table = this.getTableAndInitCtx(AbstractHoodieWriteClient.OperationType.DELETE);
        try {
            JavaRDD<HoodieKey> dedupedKeys = this.config.shouldCombineBeforeDelete() ? this.deduplicateKeys(keys2) : keys2;
            JavaRDD dedupedRecords = dedupedKeys.map((Function & Serializable)key -> new HoodieRecord<EmptyHoodieRecordPayload>((HoodieKey)key, new EmptyHoodieRecordPayload()));
            Timer.Context indexTimer = this.metrics.getIndexCtx();
            JavaRDD taggedRecords = this.getIndex().tagLocation(dedupedRecords, this.jsc, table);
            JavaRDD taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
            if (!taggedValidRecords.isEmpty()) {
                this.metrics.updateIndexMetrics(LOOKUP_STR, this.metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
                return this.upsertRecordsInternal(taggedValidRecords, commitTime, table, true);
            }
            this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(this.jsc.emptyRDD()), table, commitTime);
            JavaRDD writeStatusRDD = this.jsc.emptyRDD();
            this.commitOnAutoCommit(commitTime, (JavaRDD<WriteStatus>)writeStatusRDD, table.getMetaClient().getCommitActionType());
            return writeStatusRDD;
        }
        catch (Throwable e) {
            if (e instanceof HoodieUpsertException) {
                throw (HoodieUpsertException)e;
            }
            throw new HoodieUpsertException("Failed to delete for commit time " + commitTime, e);
        }
    }

    private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> dedupedRecords, String commitTime, HoodieTable<T> table, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
        int parallelism = this.config.getBulkInsertShuffleParallelism();
        JavaRDD<HoodieRecord<T>> repartitionedRecords = bulkInsertPartitioner.isPresent() ? bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, parallelism) : dedupedRecords.sortBy((Function & Serializable)record -> String.format("%s+%s", record.getPartitionPath(), record.getRecordKey()), true, parallelism);
        List<String> fileIDPrefixes = IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
        table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED, table.getMetaClient().getCommitActionType(), commitTime), Option.empty());
        JavaRDD writeStatusRDD = repartitionedRecords.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, this.config, table, fileIDPrefixes), true).flatMap(List::iterator);
        return this.updateIndexAndCommitIfNeeded((JavaRDD<WriteStatus>)writeStatusRDD, table, commitTime);
    }

    private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition, JavaRDD<HoodieRecord<T>> records, int parallelism) {
        return condition ? this.deduplicateRecords(records, parallelism) : records;
    }

    private void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, HoodieTable<T> table, String commitTime) throws HoodieCommitException {
        try {
            HoodieCommitMetadata metadata = new HoodieCommitMetadata();
            profile.getPartitionPaths().forEach(path -> {
                WorkloadStat partitionStat = profile.getWorkloadStat(path.toString());
                partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
                    HoodieWriteStat writeStat = new HoodieWriteStat();
                    writeStat.setFileId((String)key);
                    writeStat.setPrevCommit((String)value.getKey());
                    writeStat.setNumUpdateWrites((Long)value.getValue());
                    metadata.addWriteStat(path.toString(), writeStat);
                });
            });
            HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
            String commitActionType = table.getMetaClient().getCommitActionType();
            HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitActionType, commitTime);
            activeTimeline.transitionRequestedToInflight(requested, Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
        }
        catch (IOException io) {
            throw new HoodieCommitException("Failed to commit " + commitTime + " unable to save inflight metadata ", io);
        }
    }

    private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>> preppedRecords, String commitTime, HoodieTable<T> hoodieTable, boolean isUpsert) {
        if (preppedRecords.getStorageLevel() == StorageLevel.NONE()) {
            preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
        } else {
            LOG.info((Object)("RDD PreppedRecords was persisted at: " + preppedRecords.getStorageLevel()));
        }
        WorkloadProfile<T> profile = null;
        if (hoodieTable.isWorkloadProfileNeeded()) {
            profile = new WorkloadProfile<T>(preppedRecords);
            LOG.info((Object)("Workload profile :" + profile));
            this.saveWorkloadProfileMetadataToInflight(profile, hoodieTable, commitTime);
        }
        Partitioner partitioner = this.getPartitioner(hoodieTable, isUpsert, profile);
        JavaRDD<HoodieRecord<T>> partitionedRecords = this.partition(preppedRecords, partitioner);
        JavaRDD writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((Function2 & Serializable)(partition, recordItr) -> {
            if (isUpsert) {
                return hoodieTable.handleUpsertPartition(commitTime, (Integer)partition, (Iterator)recordItr, partitioner);
            }
            return hoodieTable.handleInsertPartition(commitTime, (Integer)partition, (Iterator)recordItr, partitioner);
        }, true).flatMap(List::iterator);
        return this.updateIndexAndCommitIfNeeded((JavaRDD<WriteStatus>)writeStatusRDD, hoodieTable, commitTime);
    }

    private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, WorkloadProfile profile) {
        if (isUpsert) {
            return table.getUpsertPartitioner(profile);
        }
        return table.getInsertPartitioner(profile);
    }

    private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
        return dedupedRecords.mapToPair((PairFunction & Serializable)record -> new Tuple2((Object)new Tuple2((Object)record.getKey(), Option.ofNullable(record.getCurrentLocation())), record)).partitionBy(partitioner).map(Tuple2::_2);
    }

    @Override
    protected void postCommit(HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) throws IOException {
        if (this.config.isInlineCompaction()) {
            metadata.addMetadata("hoodie.compact.inline", "true");
            this.forceCompact(extraMetadata);
        } else {
            metadata.addMetadata("hoodie.compact.inline", "false");
        }
        HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(this.config, this.createMetaClient(true));
        archiveLog.archiveIfRequired(this.jsc);
        if (this.config.isAutoClean()) {
            LOG.info((Object)"Auto cleaning is enabled. Running cleaner now");
            this.clean(instantTime);
        } else {
            LOG.info((Object)"Auto cleaning is not enabled. Not running cleaner now");
        }
    }

    public boolean savepoint(String user, String comment) {
        HoodieTable table = HoodieTable.getHoodieTable(this.createMetaClient(true), this.config, this.jsc);
        if (table.getCompletedCommitsTimeline().empty()) {
            throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
        }
        if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
            throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
        }
        String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp();
        LOG.info((Object)("Savepointing latest commit " + latestCommit));
        return this.savepoint(latestCommit, user, comment);
    }

    public boolean savepoint(String commitTime, String user, String comment) {
        HoodieTable table = HoodieTable.getHoodieTable(this.createMetaClient(true), this.config, this.jsc);
        if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
            throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
        }
        Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
        HoodieInstant commitInstant = new HoodieInstant(false, "commit", commitTime);
        if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
            throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant);
        }
        try {
            String lastCommitRetained;
            if (cleanInstant.isPresent()) {
                HoodieCleanMetadata cleanMetadata = AvroUtils.deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
                lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
            } else {
                lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
            }
            Preconditions.checkArgument((boolean)HoodieTimeline.compareTimestamps(commitTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL), (Object)("Could not savepoint commit " + commitTime + " as this is beyond the lookup window " + lastCommitRetained));
            Map latestFilesMap = this.jsc.parallelize(FSUtils.getAllPartitionPaths(this.fs, table.getMetaClient().getBasePath(), this.config.shouldAssumeDatePartitioning())).mapToPair((PairFunction & Serializable)partitionPath -> {
                LOG.info((Object)("Collecting latest files in partition path " + partitionPath));
                TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
                List latestFiles = view.getLatestBaseFilesBeforeOrOn((String)partitionPath, commitTime).map(HoodieBaseFile::getFileName).collect(Collectors.toList());
                return new Tuple2(partitionPath, latestFiles);
            }).collectAsMap();
            HoodieSavepointMetadata metadata = AvroUtils.convertSavepointMetadata(user, comment, latestFilesMap);
            table.getActiveTimeline().createNewInstant(new HoodieInstant(true, "savepoint", commitTime));
            table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, "savepoint", commitTime), AvroUtils.serializeSavepointMetadata(metadata));
            LOG.info((Object)("Savepoint " + commitTime + " created"));
            return true;
        }
        catch (IOException e) {
            throw new HoodieSavepointException("Failed to savepoint " + commitTime, e);
        }
    }

    public void deleteSavepoint(String savepointTime) {
        HoodieTable table = HoodieTable.getHoodieTable(this.createMetaClient(true), this.config, this.jsc);
        if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
            throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
        }
        HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
        HoodieInstant savePoint = new HoodieInstant(false, "savepoint", savepointTime);
        boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
        if (!isSavepointPresent) {
            LOG.warn((Object)("No savepoint present " + savepointTime));
            return;
        }
        activeTimeline.revertToInflight(savePoint);
        activeTimeline.deleteInflight(new HoodieInstant(true, "savepoint", savepointTime));
        LOG.info((Object)("Savepoint " + savepointTime + " deleted"));
    }

    private void deleteRequestedCompaction(String compactionTime) {
        HoodieTable table = HoodieTable.getHoodieTable(this.createMetaClient(true), this.config, this.jsc);
        HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
        HoodieInstant compactionRequestedInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, "compaction", compactionTime);
        boolean isCompactionInstantInRequestedState = table.getActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionRequestedInstant);
        HoodieTimeline commitTimeline = table.getCompletedCommitTimeline();
        if (commitTimeline.empty() && !commitTimeline.findInstantsAfter(compactionTime, Integer.MAX_VALUE).empty()) {
            throw new HoodieRollbackException("Found commits after time :" + compactionTime + ", please rollback greater commits first");
        }
        if (!isCompactionInstantInRequestedState) {
            throw new IllegalArgumentException("Compaction is not in requested state " + compactionTime);
        }
        activeTimeline.deleteCompactionRequested(compactionRequestedInstant);
        LOG.info((Object)("Compaction " + compactionTime + " deleted"));
    }

    public boolean rollbackToSavepoint(String savepointTime) {
        HoodieTable table = HoodieTable.getHoodieTable(this.createMetaClient(true), this.config, this.jsc);
        HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
        HoodieTimeline commitTimeline = table.getMetaClient().getCommitsAndCompactionTimeline();
        HoodieInstant savePoint = new HoodieInstant(false, "savepoint", savepointTime);
        boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
        if (!isSavepointPresent) {
            throw new HoodieRollbackException("No savepoint for commitTime " + savepointTime);
        }
        List commitsToRollback = commitTimeline.findInstantsAfter(savepointTime, Integer.MAX_VALUE).getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
        LOG.info((Object)("Rolling back commits " + commitsToRollback));
        this.restoreToInstant(savepointTime);
        Option<HoodieInstant> lastInstant = activeTimeline.reload().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant();
        Preconditions.checkArgument((boolean)lastInstant.isPresent());
        Preconditions.checkArgument((boolean)lastInstant.get().getTimestamp().equals(savepointTime), (Object)(savepointTime + "is not the last commit after rolling back " + commitsToRollback + ", last commit was " + lastInstant.get().getTimestamp()));
        return true;
    }

    public boolean rollback(String commitTime) throws HoodieRollbackException {
        this.rollbackInternal(commitTime);
        return true;
    }

    public void restoreToInstant(String instantTime) throws HoodieRollbackException {
        HoodieTable table = HoodieTable.getHoodieTable(this.createMetaClient(true), this.config, this.jsc);
        List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline().getReverseOrderedInstants().filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime)).collect(Collectors.toList());
        String startRollbackInstant = HoodieActiveTimeline.createNewInstantTime();
        Timer.Context context = this.startContext();
        ImmutableMap.Builder instantsToStats = ImmutableMap.builder();
        table.getActiveTimeline().createNewInstant(new HoodieInstant(true, "restore", startRollbackInstant));
        instantsToRollback.forEach(instant -> {
            try {
                switch (instant.getAction()) {
                    case "commit": 
                    case "deltacommit": {
                        List<HoodieRollbackStat> statsForInstant = this.doRollbackAndGetStats((HoodieInstant)instant);
                        instantsToStats.put((Object)instant.getTimestamp(), statsForInstant);
                        break;
                    }
                    case "compaction": {
                        List<HoodieRollbackStat> statsForCompaction = this.doRollbackAndGetStats((HoodieInstant)instant);
                        instantsToStats.put((Object)instant.getTimestamp(), statsForCompaction);
                        LOG.info((Object)("Deleted compaction instant " + instant));
                        break;
                    }
                    default: {
                        throw new IllegalArgumentException("invalid action name " + instant.getAction());
                    }
                }
            }
            catch (IOException io) {
                throw new HoodieRollbackException("unable to rollback instant " + instant, io);
            }
        });
        try {
            this.finishRestore(context, (Map<String, List<HoodieRollbackStat>>)instantsToStats.build(), instantsToRollback.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()), startRollbackInstant, instantTime);
        }
        catch (IOException io) {
            throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io);
        }
    }

    private Timer.Context startContext() {
        return this.metrics.getRollbackCtx();
    }

    private void finishRestore(Timer.Context context, Map<String, List<HoodieRollbackStat>> commitToStats, List<String> commitsToRollback, String startRestoreTime, String restoreToInstant) throws IOException {
        HoodieTable table = HoodieTable.getHoodieTable(this.createMetaClient(true), this.config, this.jsc);
        Option<Long> durationInMs = Option.empty();
        long numFilesDeleted = 0L;
        for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) {
            List<HoodieRollbackStat> stats = commitToStat.getValue();
            numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
        }
        if (context != null) {
            durationInMs = Option.of(this.metrics.getDurationInMs(context.stop()));
            this.metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted);
        }
        HoodieRestoreMetadata restoreMetadata = AvroUtils.convertRestoreMetadata(startRestoreTime, durationInMs, commitsToRollback, commitToStats);
        table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, "restore", startRestoreTime), AvroUtils.serializeRestoreMetadata(restoreMetadata));
        LOG.info((Object)("Commits " + commitsToRollback + " rollback is complete. Restored table to " + restoreToInstant));
        if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
            LOG.info((Object)"Cleaning up older restore meta files");
            FSUtils.deleteOlderRollbackMetaFiles(this.fs, table.getMetaClient().getMetaPath(), table.getActiveTimeline().getRestoreTimeline().getInstants());
        }
    }

    @Override
    public void close() {
        super.close();
        this.cleanClient.close();
    }

    public void clean() throws HoodieIOException {
        this.cleanClient.clean();
    }

    protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException {
        return this.cleanClient.clean(startCleanTime);
    }

    public String startCommit() {
        if (this.rollbackPending) {
            this.rollbackPendingCommits();
        }
        String commitTime = HoodieActiveTimeline.createNewInstantTime();
        this.startCommit(commitTime);
        return commitTime;
    }

    public void startCommitWithTime(String instantTime) {
        if (this.rollbackPending) {
            this.rollbackPendingCommits();
        }
        this.startCommit(instantTime);
    }

    private void startCommit(String instantTime) {
        LOG.info((Object)("Generate a new instant time " + instantTime));
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> Preconditions.checkArgument((boolean)HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER), (Object)("Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :" + latestPending + ",  Ingesting at " + instantTime)));
        HoodieTable table = HoodieTable.getHoodieTable(metaClient, this.config, this.jsc);
        HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
        String commitActionType = table.getMetaClient().getCommitActionType();
        activeTimeline.createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, commitActionType, instantTime));
    }

    public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws IOException {
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        LOG.info((Object)("Generate a new instant time " + instantTime));
        boolean notEmpty = this.scheduleCompactionAtInstant(instantTime, extraMetadata);
        return notEmpty ? Option.of(instantTime) : Option.empty();
    }

    public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws IOException {
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        metaClient.getCommitsTimeline().filterPendingExcludingCompaction().firstInstant().ifPresent(earliestInflight -> Preconditions.checkArgument((boolean)HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER), (Object)("Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight + ", Compaction scheduled at " + instantTime)));
        List conflictingInstants = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL)).collect(Collectors.toList());
        Preconditions.checkArgument((boolean)conflictingInstants.isEmpty(), (Object)("Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :" + conflictingInstants));
        HoodieTable table = HoodieTable.getHoodieTable(metaClient, this.config, this.jsc);
        HoodieCompactionPlan workload = table.scheduleCompaction(this.jsc, instantTime);
        if (workload != null && workload.getOperations() != null && !workload.getOperations().isEmpty()) {
            extraMetadata.ifPresent(workload::setExtraMetadata);
            HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, "compaction", instantTime);
            metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant, AvroUtils.serializeCompactionPlan(workload));
            return true;
        }
        return false;
    }

    public JavaRDD<WriteStatus> compact(String compactionInstantTime) throws IOException {
        return this.compact(compactionInstantTime, this.config.shouldAutoCommit());
    }

    public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata) throws IOException {
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        HoodieTable table = HoodieTable.getHoodieTable(metaClient, this.config, this.jsc);
        HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
        HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(timeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
        Option mergedMetaData = extraMetadata.map(m -> {
            HashMap<String, String> merged = new HashMap<String, String>();
            Map<String, String> extraMetaDataFromInstantFile = compactionPlan.getExtraMetadata();
            if (extraMetaDataFromInstantFile != null) {
                merged.putAll(extraMetaDataFromInstantFile);
            }
            merged.putAll((Map<String, String>)m);
            return Option.of(merged);
        }).orElseGet(() -> Option.ofNullable(compactionPlan.getExtraMetadata()));
        this.commitCompaction(writeStatuses, table, compactionInstantTime, true, mergedMetaData);
    }

    JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records, int parallelism) {
        boolean isIndexingGlobal = this.getIndex().isGlobal();
        return records.mapToPair((PairFunction & Serializable)record -> {
            HoodieKey hoodieKey = record.getKey();
            Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
            return new Tuple2(key, record);
        }).reduceByKey((Function2 & Serializable)(rec1, rec2) -> {
            Object reducedData = rec1.getData().preCombine(rec2.getData());
            return new HoodieRecord(rec1.getKey(), reducedData);
        }, parallelism).map(Tuple2::_2);
    }

    JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys2) {
        boolean isIndexingGlobal = this.getIndex().isGlobal();
        if (isIndexingGlobal) {
            return keys2.keyBy(HoodieKey::getRecordKey).reduceByKey((Function2 & Serializable)(key1, key2) -> key1).values();
        }
        return keys2.distinct();
    }

    private void rollbackPendingCommits() {
        HoodieTable table = HoodieTable.getHoodieTable(this.createMetaClient(true), this.config, this.jsc);
        HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
        List commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
        for (String commit : commits) {
            this.rollback(commit);
        }
    }

    private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean autoCommit) throws IOException {
        HoodieInstant instant;
        HoodieInstant inflightInstant;
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        HoodieTable table = HoodieTable.getHoodieTable(metaClient, this.config, this.jsc);
        HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
        if (pendingCompactionTimeline.containsInstant(inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime))) {
            this.rollbackInflightCompaction(inflightInstant, table);
            metaClient = this.createMetaClient(true);
            table = HoodieTable.getHoodieTable(metaClient, this.config, this.jsc);
            pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
        }
        if (pendingCompactionTimeline.containsInstant(instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime))) {
            return this.runCompaction(instant, metaClient.getActiveTimeline(), autoCommit);
        }
        throw new IllegalStateException("No Compaction request available at " + compactionInstantTime + " to run compaction");
    }

    private JavaRDD<WriteStatus> runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline, boolean autoCommit) throws IOException {
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(metaClient, compactionInstant.getTimestamp());
        activeTimeline.transitionCompactionRequestedToInflight(compactionInstant);
        this.compactionTimer = this.metrics.getCompactionCtx();
        HoodieTable table = HoodieTable.getHoodieTable(metaClient, this.config, this.jsc);
        JavaRDD<WriteStatus> statuses = table.compact(this.jsc, compactionInstant.getTimestamp(), compactionPlan);
        statuses.persist(this.config.getWriteStatusStorageLevel());
        this.commitCompaction(statuses, table, compactionInstant.getTimestamp(), autoCommit, Option.ofNullable(compactionPlan.getExtraMetadata()));
        return statuses;
    }

    protected void commitCompaction(JavaRDD<WriteStatus> compactedStatuses, HoodieTable<T> table, String compactionCommitTime, boolean autoCommit, Option<Map<String, String>> extraMetadata) {
        if (autoCommit) {
            HoodieCommitMetadata metadata = this.doCompactionCommit(table, compactedStatuses, compactionCommitTime, extraMetadata);
            if (this.compactionTimer != null) {
                long durationInMs = this.metrics.getDurationInMs(this.compactionTimer.stop());
                try {
                    this.metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(), durationInMs, metadata, "compaction");
                }
                catch (ParseException e) {
                    throw new HoodieCommitException("Commit time is not of valid format.Failed to commit compaction " + this.config.getBasePath() + " at time " + compactionCommitTime, e);
                }
            }
            LOG.info((Object)("Compacted successfully on commit " + compactionCommitTime));
        } else {
            LOG.info((Object)("Compaction did not run for commit " + compactionCommitTime));
        }
    }

    @VisibleForTesting
    void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException {
        table.rollback(this.jsc, inflightInstant, false);
        table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
    }

    private HoodieCommitMetadata doCompactionCommit(HoodieTable<T> table, JavaRDD<WriteStatus> writeStatuses, String compactionCommitTime, Option<Map<String, String>> extraMetadata) {
        HoodieTableMetaClient metaClient = table.getMetaClient();
        List updateStatusMap = writeStatuses.map(WriteStatus::getStat).collect();
        HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
        for (HoodieWriteStat stat : updateStatusMap) {
            metadata.addWriteStat(stat.getPartitionPath(), stat);
        }
        this.finalizeWrite(table, compactionCommitTime, updateStatusMap);
        extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
        LOG.info((Object)("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata));
        HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
        try {
            activeTimeline.transitionCompactionInflightToComplete(new HoodieInstant(HoodieInstant.State.INFLIGHT, "compaction", compactionCommitTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
        }
        catch (IOException e) {
            throw new HoodieCompactionException("Failed to commit " + metaClient.getBasePath() + " at time " + compactionCommitTime, e);
        }
        return metadata;
    }

    private Option<String> forceCompact(Option<Map<String, String>> extraMetadata) throws IOException {
        Option<String> compactionInstantTimeOpt = this.scheduleCompaction(extraMetadata);
        compactionInstantTimeOpt.ifPresent(compactionInstantTime -> {
            try {
                this.compact((String)compactionInstantTime, true);
            }
            catch (IOException ioe) {
                throw new HoodieIOException(ioe.getMessage(), ioe);
            }
        });
        return compactionInstantTimeOpt;
    }
}

