/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import com.codahale.metrics.Timer;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.client.BaseHoodieClient;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieTableServiceManagerClient;
import org.apache.hudi.client.RunsTableService;
import org.apache.hudi.client.TableWriteStats;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.client.timeline.TimelineArchivers;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieLogCompactException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.util.CommonClientUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieClient
implements RunsTableService {
    private static final Logger LOG = LoggerFactory.getLogger(BaseHoodieTableServiceClient.class);
    protected transient Timer.Context compactionTimer;
    protected transient Timer.Context clusteringTimer;
    protected transient Timer.Context logCompactionTimer;
    protected transient AsyncCleanerService asyncCleanerService;
    protected transient AsyncArchiveService asyncArchiveService;
    protected Set<String> pendingInflightAndRequestedInstants;

    protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, Option<EmbeddedTimelineService> timelineService) {
        super(context, clientConfig, timelineService);
    }

    protected void startAsyncCleanerService(BaseHoodieWriteClient writeClient) {
        if (this.asyncCleanerService == null) {
            this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(writeClient);
        } else {
            this.asyncCleanerService.start(null);
        }
    }

    protected void startAsyncArchiveService(BaseHoodieWriteClient writeClient) {
        if (this.asyncArchiveService == null) {
            this.asyncArchiveService = AsyncArchiveService.startAsyncArchiveIfEnabled(writeClient);
        } else {
            this.asyncArchiveService.start(null);
        }
    }

    protected void asyncClean() {
        AsyncCleanerService.waitForCompletion(this.asyncCleanerService);
    }

    protected void asyncArchive() {
        AsyncArchiveService.waitForCompletion(this.asyncArchiveService);
    }

    protected void setTableServiceTimer(WriteOperationType operationType) {
        switch (operationType) {
            case CLUSTER: {
                this.clusteringTimer = this.metrics.getClusteringCtx();
                break;
            }
            case COMPACT: {
                this.compactionTimer = this.metrics.getCompactionCtx();
                break;
            }
            case LOG_COMPACT: {
                this.logCompactionTimer = this.metrics.getLogCompactionCtx();
                break;
            }
        }
    }

    protected void setPendingInflightAndRequestedInstants(Set<String> pendingInflightAndRequestedInstants) {
        this.pendingInflightAndRequestedInstants = pendingInflightAndRequestedInstants;
    }

    protected void preCommit(HoodieCommitMetadata metadata) {
        HoodieTable<?, I, ?, T> table = this.createTable(this.config, this.storageConf);
        this.resolveWriteConflict(table, metadata, this.pendingInflightAndRequestedInstants);
    }

    protected Option<String> inlineCompaction(Option<Map<String, String>> extraMetadata) {
        Option<String> compactionInstantTimeOpt = this.inlineScheduleCompaction(extraMetadata);
        compactionInstantTimeOpt.ifPresent(compactInstantTime -> this.compact((String)compactInstantTime, true));
        return compactionInstantTimeOpt;
    }

    private void inlineCompaction(HoodieTable table, Option<Map<String, String>> extraMetadata) {
        if (this.shouldDelegateToTableServiceManager(this.config, ActionType.compaction)) {
            this.scheduleCompaction(extraMetadata);
        } else {
            this.runAnyPendingCompactions(table);
            this.inlineCompaction(extraMetadata);
        }
    }

    protected HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime, boolean shouldComplete) {
        InstantGenerator instantGenerator;
        HoodieInstant inflightInstant;
        HoodieTable<?, I, ?, T> table = this.createTable(this.config, this.context.getStorageConf());
        Set actions = CollectionUtils.createSet((Object[])new String[]{"commit", "compaction"});
        Option compactionInstantWithGreaterTimestamp = Option.fromJavaOptional(table.getActiveTimeline().getInstantsAsStream().filter(hoodieInstant -> actions.contains(hoodieInstant.getAction())).filter(hoodieInstant -> InstantComparison.compareTimestamps((String)hoodieInstant.requestedTime(), (BiPredicate)InstantComparison.GREATER_THAN, (String)logCompactionInstantTime)).findFirst());
        if (compactionInstantWithGreaterTimestamp.isPresent()) {
            throw new HoodieLogCompactException(String.format("Cannot log compact since a compaction instant with greater timestamp exists. Instant details %s", compactionInstantWithGreaterTimestamp.get()));
        }
        HoodieTimeline pendingLogCompactionTimeline = table.getActiveTimeline().filterPendingLogCompactionTimeline();
        if (pendingLogCompactionTimeline.containsInstant(inflightInstant = (instantGenerator = table.getMetaClient().getInstantGenerator()).getLogCompactionInflightInstant(logCompactionInstantTime))) {
            LOG.info("Found Log compaction inflight file. Rolling back the commit and exiting.");
            table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> this.getPendingRollbackInfo(table.getMetaClient(), (String)commitToRollback, false), this.txnManager);
            table.getMetaClient().reloadActiveTimeline();
            throw new HoodieException("Execution is aborted since it found an Inflight logcompaction,log compaction plans are mutable plans, so reschedule another logcompaction.");
        }
        this.logCompactionTimer = this.metrics.getLogCompactionCtx();
        WriteMarkersFactory.get(this.config.getMarkersType(), table, logCompactionInstantTime);
        HoodieWriteMetadata<T> writeMetadata = table.logCompact(this.context, logCompactionInstantTime);
        HoodieWriteMetadata<T> updatedWriteMetadata = this.partialUpdateTableMetadata(table, writeMetadata, logCompactionInstantTime, WriteOperationType.LOG_COMPACT);
        HoodieWriteMetadata<O> logCompactionMetadata = this.convertToOutputMetadata(updatedWriteMetadata);
        if (shouldComplete) {
            this.commitLogCompaction(logCompactionInstantTime, logCompactionMetadata, (Option<HoodieTable>)Option.of(table));
        }
        return logCompactionMetadata;
    }

    protected Option<String> inlineLogCompact(Option<Map<String, String>> extraMetadata) {
        Option<String> logCompactionInstantTimeOpt = this.scheduleLogCompaction(extraMetadata);
        logCompactionInstantTimeOpt.ifPresent(logCompactInstantTime -> this.logCompact((String)logCompactInstantTime, true));
        return logCompactionInstantTimeOpt;
    }

    protected void runAnyPendingCompactions(HoodieTable table) {
        table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants().forEach(instant -> {
            LOG.info("Running previously failed inflight compaction at instant {}", instant);
            this.compact(instant.requestedTime(), true);
        });
    }

    protected void runAnyPendingLogCompactions(HoodieTable table) {
        table.getActiveTimeline().getWriteTimeline().filterPendingLogCompactionTimeline().getInstantsAsStream().forEach(instant -> {
            LOG.info("Running previously failed inflight log compaction at instant {}", instant);
            this.logCompact(instant.requestedTime(), true);
        });
    }

    protected Option<String> inlineScheduleCompaction(Option<Map<String, String>> extraMetadata) {
        return this.scheduleCompaction(extraMetadata);
    }

    public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        return this.scheduleTableService(extraMetadata, TableServiceType.COMPACT);
    }

    protected HoodieWriteMetadata<O> compact(String compactionInstantTime, boolean shouldComplete) {
        HoodieTable<?, I, ?, T> table = this.createTable(this.config, this.context.getStorageConf());
        return this.compact(table, compactionInstantTime, shouldComplete);
    }

    protected HoodieWriteMetadata<O> compact(HoodieTable<?, I, ?, T> table, String compactionInstantTime, boolean shouldComplete) {
        InstantGenerator instantGenerator;
        HoodieInstant inflightInstant;
        HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
        if (pendingCompactionTimeline.containsInstant(inflightInstant = (instantGenerator = table.getMetaClient().getInstantGenerator()).getCompactionInflightInstant(compactionInstantTime))) {
            table.rollbackInflightCompaction(inflightInstant, commitToRollback -> this.getPendingRollbackInfo(table.getMetaClient(), (String)commitToRollback, false), this.txnManager);
            table.getMetaClient().reloadActiveTimeline();
        }
        this.compactionTimer = this.metrics.getCompactionCtx();
        HoodieWriteMetadata<T> writeMetadata = table.compact(this.context, compactionInstantTime);
        HoodieWriteMetadata<T> updatedWriteMetadata = this.partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime, WriteOperationType.COMPACT);
        HoodieWriteMetadata<O> compactionWriteMetadata = this.convertToOutputMetadata(updatedWriteMetadata);
        if (shouldComplete) {
            this.commitCompaction(compactionInstantTime, compactionWriteMetadata, (Option<HoodieTable>)Option.of(table));
        }
        return compactionWriteMetadata;
    }

    protected HoodieWriteMetadata<T> partialUpdateTableMetadata(HoodieTable table, HoodieWriteMetadata<T> writeMetadata, String instantTime, WriteOperationType writeOperationType) {
        return writeMetadata;
    }

    public void commitCompaction(String compactionInstantTime, HoodieWriteMetadata<O> compactionWriteMetadata, Option<HoodieTable> tableOpt) {
        TableWriteStats tableWriteStats = this.triggerWritesAndFetchWriteStats(compactionWriteMetadata);
        CommonClientUtils.stitchCompactionHoodieWriteStats(compactionWriteMetadata, tableWriteStats.getDataTableWriteStats());
        this.metrics.emitCompactionCompleted();
        HoodieTable table = (HoodieTable)tableOpt.orElseGet(() -> this.createTable(this.config, this.context.getStorageConf()));
        this.completeCompaction((HoodieCommitMetadata)compactionWriteMetadata.getCommitMetadata().get(), table, compactionInstantTime, tableWriteStats.getMetadataTableWriteStats());
    }

    protected abstract TableWriteStats triggerWritesAndFetchWriteStats(HoodieWriteMetadata<O> var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime, List<HoodieWriteStat> partialMetadataWriteStats) {
        this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + this.config.getTableName());
        List writeStats = metadata.getWriteStats();
        this.handleWriteErrors(writeStats, TableServiceType.COMPACT);
        InstantGenerator instantGenerator = table.getMetaClient().getInstantGenerator();
        HoodieInstant compactionInstant = instantGenerator.getCompactionInflightInstant(compactionCommitTime);
        try {
            this.txnManager.beginStateChange((Option<HoodieInstant>)Option.of((Object)compactionInstant), (Option<HoodieInstant>)Option.empty());
            this.finalizeWrite(table, compactionCommitTime, writeStats);
            this.writeToMetadataTable(table, compactionCommitTime, metadata, partialMetadataWriteStats);
            LOG.info("Committing Compaction {}", (Object)compactionCommitTime);
            CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
            LOG.debug("Compaction {} finished with result: {}", (Object)compactionCommitTime, (Object)metadata);
        }
        finally {
            this.txnManager.endStateChange((Option<HoodieInstant>)Option.of((Object)compactionInstant));
            this.releaseResources(compactionCommitTime);
        }
        WriteMarkersFactory.get(this.config.getMarkersType(), table, compactionCommitTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
        if (this.compactionTimer != null) {
            long durationInMs = this.metrics.getDurationInMs(this.compactionTimer.stop());
            TimelineUtils.parseDateFromInstantTimeSafely((String)compactionCommitTime).ifPresent(parsedInstant -> this.metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, "compaction"));
        }
        LOG.info("Compacted successfully on commit {}", (Object)compactionCommitTime);
    }

    protected void writeToMetadataTable(HoodieTable table, String instantTime, HoodieCommitMetadata metadata, List<HoodieWriteStat> partialMetadataWriteStats) {
        this.writeTableMetadata(table, instantTime, metadata);
    }

    public void commitLogCompaction(String compactionInstantTime, HoodieWriteMetadata<O> writeMetadata, Option<HoodieTable> tableOpt) {
        TableWriteStats tableWriteStats = this.triggerWritesAndFetchWriteStats(writeMetadata);
        CommonClientUtils.stitchCompactionHoodieWriteStats(writeMetadata, tableWriteStats.getDataTableWriteStats());
        this.metrics.emitCompactionCompleted();
        HoodieTable table = (HoodieTable)tableOpt.orElseGet(() -> this.createTable(this.config, this.context.getStorageConf()));
        this.completeLogCompaction((HoodieCommitMetadata)writeMetadata.getCommitMetadata().get(), table, compactionInstantTime, tableWriteStats.getMetadataTableWriteStats());
    }

    public Option<String> scheduleLogCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        return this.scheduleTableService(extraMetadata, TableServiceType.LOG_COMPACT);
    }

    public HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime) {
        return this.logCompact(logCompactionInstantTime, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable table, String logCompactionCommitTime, List<HoodieWriteStat> partialMetadataWriteStats) {
        this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log compaction write status and commit compaction");
        List writeStats = metadata.getWriteStats();
        this.handleWriteErrors(writeStats, TableServiceType.LOG_COMPACT);
        HoodieInstant logCompactionInstant = table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, "logcompaction", logCompactionCommitTime);
        try {
            this.txnManager.beginStateChange((Option<HoodieInstant>)Option.of((Object)logCompactionInstant), (Option<HoodieInstant>)Option.empty());
            this.preCommit(metadata);
            this.finalizeWrite(table, logCompactionCommitTime, writeStats);
            this.writeToMetadataTable(table, logCompactionCommitTime, metadata, partialMetadataWriteStats);
            LOG.info("Committing Log Compaction {}", (Object)logCompactionCommitTime);
            CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata);
            LOG.debug("Log Compaction {} finished with result {}", (Object)logCompactionCommitTime, (Object)metadata);
        }
        finally {
            this.txnManager.endStateChange((Option<HoodieInstant>)Option.of((Object)logCompactionInstant));
            this.releaseResources(logCompactionCommitTime);
        }
        WriteMarkersFactory.get(this.config.getMarkersType(), table, logCompactionCommitTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
        if (this.logCompactionTimer != null) {
            long durationInMs = this.metrics.getDurationInMs(this.logCompactionTimer.stop());
            TimelineUtils.parseDateFromInstantTimeSafely((String)logCompactionCommitTime).ifPresent(parsedInstant -> this.metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, "logcompaction"));
        }
        LOG.info("Log Compacted successfully on commit {}", (Object)logCompactionCommitTime);
    }

    public Option<String> scheduleClustering(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        return this.scheduleTableService(extraMetadata, TableServiceType.CLUSTER);
    }

    public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete) {
        HoodieTable<?, I, ?, T> table = this.createTable(this.config, this.context.getStorageConf());
        HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceOrClusteringTimeline();
        Option inflightInstantOpt = ClusteringUtils.getInflightClusteringInstant((String)clusteringInstant, (HoodieActiveTimeline)table.getActiveTimeline(), (InstantGenerator)table.getMetaClient().getInstantGenerator());
        if (inflightInstantOpt.isPresent()) {
            if (pendingClusteringTimeline.isPendingClusteringInstant(((HoodieInstant)inflightInstantOpt.get()).requestedTime())) {
                table.rollbackInflightClustering((HoodieInstant)inflightInstantOpt.get(), commitToRollback -> this.getPendingRollbackInfo(table.getMetaClient(), (String)commitToRollback, false), this.txnManager);
                table.getMetaClient().reloadActiveTimeline();
            } else {
                throw new HoodieClusteringException("Non clustering replace-commit inflight at timestamp " + clusteringInstant);
            }
        }
        this.clusteringTimer = this.metrics.getClusteringCtx();
        LOG.info("Starting clustering at {} for table {}", (Object)clusteringInstant, (Object)table.getConfig().getBasePath());
        HoodieWriteMetadata<T> writeMetadata = table.cluster(this.context, clusteringInstant);
        HoodieWriteMetadata<T> updatedWriteMetadata = this.partialUpdateTableMetadata(table, writeMetadata, clusteringInstant, WriteOperationType.CLUSTER);
        HoodieWriteMetadata<O> clusteringMetadata = this.convertToOutputMetadata(updatedWriteMetadata);
        if (shouldComplete) {
            this.commitClustering(clusteringMetadata, table, clusteringInstant);
        }
        return clusteringMetadata;
    }

    public boolean purgePendingClustering(String clusteringInstant) {
        HoodieTable<?, I, ?, T> table = this.createTable(this.config, this.context.getStorageConf());
        Option inflightInstantOpt = ClusteringUtils.getInflightClusteringInstant((String)clusteringInstant, (HoodieActiveTimeline)table.getActiveTimeline(), (InstantGenerator)table.getMetaClient().getInstantGenerator());
        if (inflightInstantOpt.isPresent()) {
            table.rollbackInflightClustering((HoodieInstant)inflightInstantOpt.get(), commitToRollback -> this.getPendingRollbackInfo(table.getMetaClient(), (String)commitToRollback, false), true, this.txnManager);
            table.getMetaClient().reloadActiveTimeline();
            return true;
        }
        return false;
    }

    protected abstract HoodieWriteMetadata<O> convertToOutputMetadata(HoodieWriteMetadata<T> var1);

    protected void runPrecommitValidationForClustering(HoodieWriteMetadata<O> writeMetadata, HoodieTable table, String instantTime) {
        if (StringUtils.isNullOrEmpty((String)this.config.getPreCommitValidators())) {
            return;
        }
        throw new HoodieIOException("Precommit validation not implemented for all engines yet");
    }

    private void commitClustering(HoodieWriteMetadata<O> clusteringWriteMetadata, HoodieTable table, String clusteringCommitTime) {
        TableWriteStats tableWriteStats = this.triggerWritesAndFetchWriteStats(clusteringWriteMetadata);
        clusteringWriteMetadata.setWriteStats(tableWriteStats.getDataTableWriteStats());
        HoodieReplaceCommitMetadata replaceCommitMetadata = (HoodieReplaceCommitMetadata)clusteringWriteMetadata.getCommitMetadata().get();
        for (HoodieWriteStat writeStat : tableWriteStats.getDataTableWriteStats()) {
            replaceCommitMetadata.addWriteStat(writeStat.getPartitionPath(), writeStat);
        }
        HoodieClusteringPlan clusteringPlan = ClusteringUtils.getPendingClusteringPlan((HoodieTableMetaClient)table.getMetaClient(), (String)clusteringCommitTime);
        Map<String, List<String>> partitionToReplaceFileIds = CommonClientUtils.getPartitionToReplacedFileIds(clusteringPlan, clusteringWriteMetadata, this.config);
        clusteringWriteMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
        replaceCommitMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
        this.runPrecommitValidationForClustering(clusteringWriteMetadata, table, clusteringCommitTime);
        if (this.config.isMetricsOn()) {
            clusteringWriteMetadata.getWriteStats().ifPresent(hoodieWriteStats -> hoodieWriteStats.stream().filter(hoodieWriteStat -> hoodieWriteStat.getRuntimeStats() != null).map(hoodieWriteStat -> hoodieWriteStat.getRuntimeStats().getTotalCreateTime()).forEach(this.metrics::updateClusteringFileCreationMetrics));
        }
        this.completeClustering(replaceCommitMetadata, tableWriteStats.getDataTableWriteStats(), table, clusteringCommitTime, tableWriteStats.getMetadataTableWriteStats());
    }

    private void completeClustering(HoodieReplaceCommitMetadata replaceCommitMetadata, List<HoodieWriteStat> writeStats, HoodieTable table, String clusteringCommitTime, List<HoodieWriteStat> partialMetadataWriteStats) {
        this.handleWriteErrors(writeStats, TableServiceType.CLUSTER);
        HoodieInstant clusteringInstant = (HoodieInstant)ClusteringUtils.getInflightClusteringInstant((String)clusteringCommitTime, (HoodieActiveTimeline)table.getActiveTimeline(), (InstantGenerator)table.getMetaClient().getInstantGenerator()).get();
        try {
            this.txnManager.beginStateChange((Option<HoodieInstant>)Option.of((Object)clusteringInstant), (Option<HoodieInstant>)Option.empty());
            this.finalizeWrite(table, clusteringCommitTime, writeStats);
            if (this.isPreCommitRequired()) {
                this.preCommit((HoodieCommitMetadata)replaceCommitMetadata);
            }
            this.writeToMetadataTable(table, clusteringInstant.requestedTime(), (HoodieCommitMetadata)replaceCommitMetadata, partialMetadataWriteStats);
            LOG.info("Committing Clustering {} for table {}", (Object)clusteringCommitTime, (Object)table.getConfig().getBasePath());
            ClusteringUtils.transitionClusteringOrReplaceInflightToComplete((boolean)false, (HoodieInstant)clusteringInstant, (HoodieReplaceCommitMetadata)replaceCommitMetadata, (HoodieActiveTimeline)table.getActiveTimeline(), completedInstant -> table.getMetaClient().getTableFormat().commit((HoodieCommitMetadata)replaceCommitMetadata, completedInstant, table.getContext(), table.getMetaClient(), table.getViewManager()));
            LOG.debug("Clustering {} finished with result {}", (Object)clusteringCommitTime, (Object)replaceCommitMetadata);
        }
        catch (Exception e) {
            throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, (Throwable)e);
        }
        finally {
            this.txnManager.endStateChange((Option<HoodieInstant>)Option.of((Object)clusteringInstant));
            this.releaseResources(clusteringCommitTime);
        }
        WriteMarkersFactory.get(this.config.getMarkersType(), table, clusteringCommitTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
        if (this.clusteringTimer != null) {
            long durationInMs = this.metrics.getDurationInMs(this.clusteringTimer.stop());
            TimelineUtils.parseDateFromInstantTimeSafely((String)clusteringCommitTime).ifPresent(parsedInstant -> this.metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, (HoodieCommitMetadata)replaceCommitMetadata, "clustering"));
        }
        LOG.info("Clustering successfully on commit {} for table {}", (Object)clusteringCommitTime, (Object)table.getConfig().getBasePath());
    }

    protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
        if (!this.tableServicesEnabled(this.config)) {
            return;
        }
        if (!this.config.areAnyTableServicesExecutedInline().booleanValue() && !this.config.areAnyTableServicesScheduledInline().booleanValue()) {
            return;
        }
        if (this.config.inlineCompactionEnabled()) {
            metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
            this.inlineCompaction(table, extraMetadata);
        } else {
            metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
        }
        if (!this.config.inlineCompactionEnabled() && this.config.scheduleInlineCompaction() && table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().empty()) {
            metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true");
            this.inlineScheduleCompaction(extraMetadata);
        }
        if (this.config.inlineLogCompactionEnabled()) {
            this.runAnyPendingLogCompactions(table);
            metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "true");
            this.inlineLogCompact(extraMetadata);
        } else {
            metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "false");
        }
        if (this.config.inlineClusteringEnabled()) {
            metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
            this.inlineClustering(table, extraMetadata);
        } else {
            metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false");
        }
        if (!this.config.inlineClusteringEnabled() && this.config.scheduleInlineClustering() && !table.getActiveTimeline().getLastPendingClusterInstant().isPresent()) {
            metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true");
            this.inlineScheduleClustering(extraMetadata);
        }
        if (this.config.isInlinePartitionTTLEnable()) {
            String instantTime = this.startDeletePartitionCommit(table.getMetaClient()).requestedTime();
            table.managePartitionTTL(table.getContext(), instantTime);
        }
    }

    public Option<String> scheduleTableService(Option<Map<String, String>> extraMetadata, TableServiceType tableServiceType) {
        return this.scheduleTableServiceInternal((Option<String>)Option.empty(), extraMetadata, tableServiceType);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Option<String> scheduleTableServiceInternal(Option<String> providedInstantTime, Option<Map<String, String>> extraMetadata, TableServiceType tableServiceType) {
        if (!this.tableServicesEnabled(this.config)) {
            return Option.empty();
        }
        if (tableServiceType == TableServiceType.ARCHIVE) {
            LOG.info("Archival does not need scheduling. Skipping.");
            return Option.empty();
        }
        if (tableServiceType == TableServiceType.CLEAN) {
            return this.scheduleCleaning(this.createTable(this.config, this.storageConf), providedInstantTime);
        }
        this.txnManager.beginStateChange((Option<HoodieInstant>)Option.empty(), (Option<HoodieInstant>)Option.empty());
        try {
            Option option;
            HoodieTable<?, I, ?, T> table = this.createTable(this.config, this.storageConf);
            String instantTime = (String)providedInstantTime.orElseGet(() -> this.createNewInstantTime(false));
            switch (tableServiceType) {
                case CLUSTER: {
                    LOG.info("Scheduling clustering at instant time: {} for table {}", (Object)instantTime, (Object)this.config.getBasePath());
                    Option<HoodieClusteringPlan> clusteringPlan = table.scheduleClustering(this.context, instantTime, extraMetadata);
                    option = clusteringPlan.map(plan -> instantTime);
                    break;
                }
                case COMPACT: {
                    LOG.info("Scheduling compaction at instant time: {} for table {}", (Object)instantTime, (Object)this.config.getBasePath());
                    Option<HoodieCompactionPlan> compactionPlan = table.scheduleCompaction(this.context, instantTime, extraMetadata);
                    option = compactionPlan.map(plan -> instantTime);
                    break;
                }
                case LOG_COMPACT: {
                    LOG.info("Scheduling log compaction at instant time: {} for table {}", (Object)instantTime, (Object)this.config.getBasePath());
                    Option<HoodieCompactionPlan> logCompactionPlan = table.scheduleLogCompaction(this.context, instantTime, extraMetadata);
                    option = logCompactionPlan.map(plan -> instantTime);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Invalid TableService " + tableServiceType);
                }
            }
            Option<String> instantRange = this.delegateToTableServiceManager(tableServiceType, table);
            if (instantRange.isPresent()) {
                LOG.info("Delegate instant [{}] to table service manager", instantRange.get());
            }
            Option option2 = option;
            return option2;
        }
        finally {
            this.txnManager.endStateChange((Option<HoodieInstant>)Option.empty());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    HoodieInstant startDeletePartitionCommit(HoodieTableMetaClient metaClient) {
        this.txnManager.beginStateChange((Option<HoodieInstant>)Option.empty(), (Option<HoodieInstant>)Option.empty());
        try {
            String instantTime = this.createNewInstantTime(false);
            HoodieInstant dropPartitionsInstant = metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.REQUESTED, "replacecommit", instantTime);
            HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder().setOperationType(WriteOperationType.DELETE_PARTITION.name()).setExtraMetadata(Collections.emptyMap()).build();
            metaClient.getActiveTimeline().saveToPendingReplaceCommit(dropPartitionsInstant, requestedReplaceMetadata);
            HoodieInstant hoodieInstant = dropPartitionsInstant;
            return hoodieInstant;
        }
        finally {
            this.txnManager.endStateChange((Option<HoodieInstant>)Option.empty());
        }
    }

    protected HoodieTable createTableAndValidate(HoodieWriteConfig config, BiFunction<HoodieWriteConfig, HoodieEngineContext, HoodieTable> createTableFn, boolean skipValidation) {
        HoodieTable table = createTableFn.apply(config, this.context);
        if (!skipValidation) {
            CommonClientUtils.validateTableVersion(table.getMetaClient().getTableConfig(), config);
        }
        return table;
    }

    protected HoodieTable<?, I, ?, T> createTable(HoodieWriteConfig config, StorageConfiguration<?> storageConf) {
        return this.createTable(config, storageConf, false);
    }

    protected abstract HoodieTable<?, I, ?, T> createTable(HoodieWriteConfig var1, StorageConfiguration<?> var2, boolean var3);

    protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
        Option<String> clusteringInstantOpt = this.inlineScheduleClustering(extraMetadata);
        clusteringInstantOpt.ifPresent(clusteringInstant -> this.cluster((String)clusteringInstant, true));
        return clusteringInstantOpt;
    }

    private void inlineClustering(HoodieTable table, Option<Map<String, String>> extraMetadata) {
        if (this.shouldDelegateToTableServiceManager(this.config, ActionType.clustering)) {
            this.scheduleClustering(extraMetadata);
        } else {
            this.runAnyPendingClustering(table);
            this.inlineClustering(extraMetadata);
        }
    }

    protected Option<String> inlineScheduleClustering(Option<Map<String, String>> extraMetadata) {
        return this.scheduleClustering(extraMetadata);
    }

    protected void runAnyPendingClustering(HoodieTable table) {
        table.getActiveTimeline().filterPendingReplaceOrClusteringTimeline().getInstants().forEach(instant -> {
            Option instantPlan = ClusteringUtils.getClusteringPlan((HoodieTableMetaClient)table.getMetaClient(), (HoodieInstant)instant);
            if (instantPlan.isPresent()) {
                LOG.info("Running pending clustering at instant {} for table {}", ((Pair)instantPlan.get()).getLeft(), (Object)this.config.getBasePath());
                this.cluster(instant.requestedTime(), true);
            }
        });
    }

    @Nullable
    HoodieCleanMetadata clean(Option<String> suppliedCleanInstant, boolean scheduleInline) throws HoodieIOException {
        if (!this.tableServicesEnabled(this.config)) {
            return null;
        }
        Timer.Context timerContext = this.metrics.getCleanCtx();
        HoodieTable<?, I, ?, T> initialTable = this.createTable(this.config, this.storageConf);
        HoodieTable<?, I, ?, Object> table = CleanerUtils.rollbackFailedWrites((HoodieFailedWritesCleaningPolicy)this.config.getFailedWritesCleanPolicy(), (String)"clean", (Functions.Function0 & Serializable)() -> this.rollbackFailedWrites(initialTable.getMetaClient())) ? this.createTable(this.config, this.storageConf) : initialTable;
        Option inflightClean = table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().map(HoodieInstant::requestedTime);
        Option<String> cleanInstantTime = Option.empty();
        if (this.config.allowMultipleCleans() || inflightClean.isEmpty()) {
            LOG.info("Cleaner started for table {}", (Object)this.config.getBasePath());
            if (scheduleInline) {
                cleanInstantTime = this.scheduleCleaning(table, suppliedCleanInstant);
            }
            if (this.shouldDelegateToTableServiceManager(this.config, ActionType.clean)) {
                LOG.info("Cleaning is not yet supported with Table Service Manager.");
                return null;
            }
        }
        if (inflightClean.isPresent() || cleanInstantTime.isPresent()) {
            table.getMetaClient().reloadActiveTimeline();
            String cleanInstantToExecute = cleanInstantTime.isPresent() ? (String)cleanInstantTime.get() : (String)inflightClean.get();
            HoodieCleanMetadata metadata = table.clean(this.context, cleanInstantToExecute);
            if (timerContext != null && metadata != null) {
                long durationMs = this.metrics.getDurationInMs(timerContext.stop());
                this.metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
                LOG.info("Cleaned {} files Earliest Retained Instant :{} cleanerElapsedMs: {}", new Object[]{metadata.getTotalFilesDeleted(), metadata.getEarliestCommitToRetain(), durationMs});
            }
            this.releaseResources(cleanInstantToExecute);
            return metadata;
        }
        return null;
    }

    private Option<String> scheduleCleaning(HoodieTable<?, ?, ?, ?> table, Option<String> suppliedCleanInstant) {
        Option<HoodieCleanerPlan> cleanerPlan = table.createCleanerPlan(this.context, (Option<Map<String, String>>)Option.empty());
        if (cleanerPlan.isPresent()) {
            this.txnManager.beginStateChange((Option<HoodieInstant>)Option.empty(), (Option<HoodieInstant>)Option.empty());
            try {
                String cleanInstantTime = (String)suppliedCleanInstant.orElseGet(() -> this.createNewInstantTime(false));
                HoodieInstant cleanInstant = table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED, "clean", cleanInstantTime);
                table.getActiveTimeline().saveToCleanRequested(cleanInstant, cleanerPlan);
                LOG.info("Requesting Cleaning with instant time {}", (Object)cleanInstant);
                Option<String> instantRange = this.delegateToTableServiceManager(TableServiceType.CLEAN, table);
                if (instantRange.isPresent()) {
                    LOG.info("Delegate instant [{}] to table service manager", instantRange.get());
                }
                Option option = Option.of((Object)cleanInstantTime);
                return option;
            }
            catch (HoodieIOException e) {
                LOG.error("Got exception when saving cleaner requested file", (Throwable)e);
                throw e;
            }
            finally {
                this.txnManager.endStateChange((Option<HoodieInstant>)Option.empty());
            }
        }
        return Option.empty();
    }

    protected void archive(HoodieTable table) {
        if (!this.tableServicesEnabled(this.config)) {
            return;
        }
        try {
            Timer.Context timerContext = this.metrics.getArchiveCtx();
            HoodieTimelineArchiver archiver = TimelineArchivers.getInstance(table.getMetaClient().getTimelineLayoutVersion(), this.config, table);
            int instantsToArchive = archiver.archiveIfRequired(this.context, true);
            if (timerContext != null) {
                long durationMs = this.metrics.getDurationInMs(timerContext.stop());
                this.metrics.updateArchiveMetrics(durationMs, instantsToArchive);
            }
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Failed to archive", ioe);
        }
    }

    private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieTableMetaClient metaClient) {
        HoodieTimeline inflightTimelineExcludingCompaction = metaClient.getCommitsTimeline().filterPendingExcludingCompaction();
        return inflightTimelineExcludingCompaction.filter(instant -> !ClusteringUtils.isClusteringInstant((HoodieTimeline)inflightTimelineExcludingCompaction, (HoodieInstant)instant, (InstantGenerator)metaClient.getInstantGenerator()));
    }

    protected Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) {
        return this.getPendingRollbackInfo(metaClient, commitToRollback, true);
    }

    public Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
        return this.getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, (Option<HoodiePendingRollbackInfo>)Option.empty());
    }

    protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
        return this.getPendingRollbackInfos(metaClient, true);
    }

    protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) {
        List instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants();
        HashMap<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<String, Option<HoodiePendingRollbackInfo>>();
        for (HoodieInstant rollbackInstant : instants) {
            HoodieRollbackPlan rollbackPlan;
            try {
                rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, rollbackInstant);
            }
            catch (Exception e) {
                if (rollbackInstant.isRequested()) {
                    LOG.warn("Fetching rollback plan failed for {}, deleting the plan since it's in REQUESTED state", (Object)rollbackInstant, (Object)e);
                    try {
                        metaClient.getActiveTimeline().deletePending(rollbackInstant);
                    }
                    catch (HoodieIOException he) {
                        LOG.warn("Cannot delete {}", (Object)rollbackInstant, (Object)he);
                    }
                    continue;
                }
                LOG.error("Fetching rollback plan failed for {}, skip the plan", (Object)rollbackInstant, (Object)e);
                continue;
            }
            try {
                String action = rollbackPlan.getInstantToRollback().getAction();
                String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
                if (ignoreCompactionAndClusteringInstants) {
                    if ("compaction".equals(action)) continue;
                    InstantGenerator instantGenerator = metaClient.getInstantGenerator();
                    boolean isClustering = ClusteringUtils.isClusteringInstant((HoodieTimeline)metaClient.getActiveTimeline(), (HoodieInstant)instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, action, instantToRollback), (InstantGenerator)instantGenerator);
                    if (isClustering) continue;
                    infoMap.putIfAbsent(instantToRollback, (Option<HoodiePendingRollbackInfo>)Option.of((Object)new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
                    continue;
                }
                infoMap.putIfAbsent(instantToRollback, (Option<HoodiePendingRollbackInfo>)Option.of((Object)new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
            }
            catch (Exception e) {
                LOG.warn("Processing rollback plan failed for {}, skip the plan", (Object)rollbackInstant, (Object)e);
            }
        }
        return infoMap;
    }

    protected boolean rollbackFailedIndexingCommits() {
        HoodieTable<?, I, ?, T> table = this.createTable(this.config, this.storageConf);
        List<String> instantsToRollback = this.getFailedIndexingCommitsToRollbackForMetadataTable(table.getMetaClient());
        Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = this.getPendingRollbackInfos(table.getMetaClient());
        instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent((String)entry, (Option<HoodiePendingRollbackInfo>)Option.empty()));
        this.rollbackFailedWrites(pendingRollbacks);
        return !pendingRollbacks.isEmpty();
    }

    private List<String> getFailedIndexingCommitsToRollbackForMetadataTable(HoodieTableMetaClient metaClient) {
        if (!HoodieTableMetadata.isMetadataTable((StoragePath)metaClient.getBasePath())) {
            return Collections.emptyList();
        }
        HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder().setBasePath(HoodieTableMetadata.getDatasetBasePath((String)this.config.getBasePath())).setConf(metaClient.getStorageConf().newInstance()).build();
        HoodieTimeline dataIndexTimeline = dataMetaClient.getActiveTimeline().filter(instant -> instant.getAction().equals("indexing"));
        Stream inflightInstantsStream = metaClient.getCommitsTimeline().filter(instant -> !instant.isCompleted() && HoodieTableMetadataUtil.isIndexingCommit((HoodieTimeline)dataIndexTimeline, (String)instant.requestedTime())).getInstantsAsStream();
        return inflightInstantsStream.filter(instant -> {
            try {
                return this.heartbeatClient.isHeartbeatExpired(instant.requestedTime());
            }
            catch (IOException io) {
                throw new HoodieException("Failed to check heartbeat for instant " + instant, (Throwable)io);
            }
        }).map(HoodieInstant::requestedTime).collect(Collectors.toList());
    }

    protected boolean rollbackFailedWrites(HoodieTableMetaClient metaClient) {
        List<String> instantsToRollback = this.getInstantsToRollback(metaClient, this.config.getFailedWritesCleanPolicy(), (Option<String>)Option.empty());
        Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = this.getPendingRollbackInfos(metaClient);
        instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent((String)entry, (Option<HoodiePendingRollbackInfo>)Option.empty()));
        this.rollbackFailedWrites(pendingRollbacks);
        return !pendingRollbacks.isEmpty();
    }

    protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback) {
        this.rollbackFailedWrites(instantsToRollback, false, false);
    }

    protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking, boolean skipVersionCheck) {
        LinkedHashMap reverseSortedRollbackInstants = instantsToRollback.entrySet().stream().sorted((i1, i2) -> ((String)i2.getKey()).compareTo((String)i1.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
        boolean isMetadataTable = HoodieTableMetadata.isMetadataTable((String)this.basePath);
        for (Map.Entry entry : reverseSortedRollbackInstants.entrySet()) {
            if (!isMetadataTable && InstantComparison.compareTimestamps((String)((String)entry.getKey()), (BiPredicate)InstantComparison.LESSER_THAN_OR_EQUALS, (String)"00000000000002")) {
                this.rollbackFailedBootstrap();
                HeartbeatUtils.deleteHeartbeatFile(this.storage, this.basePath, (String)entry.getKey(), this.config);
                break;
            }
            this.rollback((String)entry.getKey(), (Option<HoodiePendingRollbackInfo>)((Option)entry.getValue()), skipLocking, skipVersionCheck);
            HeartbeatUtils.deleteHeartbeatFile(this.storage, this.basePath, (String)entry.getKey(), this.config);
        }
    }

    protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option<String> curInstantTime) {
        Stream inflightInstantsStream = this.getInflightTimelineExcludeCompactionAndClustering(metaClient).getReverseOrderedInstants();
        if (cleaningPolicy.isEager()) {
            if (metaClient.isMetadataTable()) {
                HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder().setBasePath(HoodieTableMetadata.getDatasetBasePath((String)this.config.getBasePath())).setConf(metaClient.getStorageConf().newInstance()).build();
                HoodieTimeline dataIndexTimeline = dataMetaClient.getActiveTimeline().filter(instant -> instant.getAction().equals("indexing"));
                return inflightInstantsStream.map(HoodieInstant::requestedTime).filter(entry -> {
                    if (curInstantTime.isPresent()) {
                        return !entry.equals(curInstantTime.get());
                    }
                    return !HoodieTableMetadataUtil.isIndexingCommit((HoodieTimeline)dataIndexTimeline, (String)entry);
                }).collect(Collectors.toList());
            }
            return inflightInstantsStream.map(HoodieInstant::requestedTime).filter(entry -> {
                if (curInstantTime.isPresent()) {
                    return !entry.equals(curInstantTime.get());
                }
                return true;
            }).collect(Collectors.toList());
        }
        if (cleaningPolicy.isLazy()) {
            return this.getInstantsToRollbackForLazyCleanPolicy(metaClient, inflightInstantsStream);
        }
        if (cleaningPolicy.isNever()) {
            return Collections.emptyList();
        }
        throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + this.config.getFailedWritesCleanPolicy());
    }

    private List<String> getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClient metaClient, Stream<HoodieInstant> inflightInstantsStream) {
        List expiredInstants = inflightInstantsStream.filter(instant -> {
            try {
                return this.heartbeatClient.isHeartbeatExpired(instant.requestedTime());
            }
            catch (IOException io) {
                throw new HoodieException("Failed to check heartbeat for instant " + instant, (Throwable)io);
            }
        }).map(HoodieInstant::requestedTime).collect(Collectors.toList());
        if (!expiredInstants.isEmpty()) {
            metaClient.reloadActiveTimeline();
            HoodieTimeline refreshedInflightTimeline = this.getInflightTimelineExcludeCompactionAndClustering(metaClient);
            return expiredInstants.stream().filter(arg_0 -> ((HoodieTimeline)refreshedInflightTimeline).containsInstant(arg_0)).collect(Collectors.toList());
        }
        return Collections.emptyList();
    }

    @Deprecated
    public boolean rollback(String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking, boolean skipVersionCheck) throws HoodieRollbackException {
        return this.rollback(commitInstantTime, pendingRollbackInfo, (Option<String>)Option.empty(), skipLocking, skipVersionCheck);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Deprecated
    public boolean rollback(String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, Option<String> suppliedRollbackInstantTime, boolean skipLocking, boolean skipVersionCheck) throws HoodieRollbackException {
        LOG.info("Begin rollback of instant {} for table {}", (Object)commitInstantTime, (Object)this.config.getBasePath());
        Timer.Context timerContext = this.metrics.getRollbackCtx();
        try {
            String rollbackInstantTime;
            Option<HoodieRollbackPlan> rollbackPlanOption;
            HoodieTable<?, I, ?, T> table = this.createTable(this.config, this.storageConf, skipVersionCheck);
            Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream().filter(instant -> InstantComparison.EQUALS.test(instant.requestedTime(), commitInstantTime)).findFirst());
            if (pendingRollbackInfo.isPresent()) {
                rollbackPlanOption = Option.of((Object)((HoodiePendingRollbackInfo)pendingRollbackInfo.get()).getRollbackPlan());
                rollbackInstantTime = ((HoodiePendingRollbackInfo)pendingRollbackInfo.get()).getRollbackInstant().requestedTime();
            } else {
                if (commitInstantOpt.isEmpty()) {
                    LOG.error("Cannot find instant {} in the timeline of table {} for rollback", (Object)commitInstantTime, (Object)this.config.getBasePath());
                    return false;
                }
                if (!skipLocking) {
                    this.txnManager.beginStateChange((Option<HoodieInstant>)Option.empty(), (Option<HoodieInstant>)Option.empty());
                }
                try {
                    rollbackInstantTime = (String)suppliedRollbackInstantTime.orElseGet(() -> this.createNewInstantTime(false));
                    rollbackPlanOption = table.scheduleRollback(this.context, rollbackInstantTime, (HoodieInstant)commitInstantOpt.get(), false, this.config.shouldRollbackUsingMarkers(), false);
                }
                finally {
                    if (!skipLocking) {
                        this.txnManager.endStateChange((Option<HoodieInstant>)Option.empty());
                    }
                }
            }
            if (rollbackPlanOption.isPresent()) {
                HoodieRollbackMetadata rollbackMetadata;
                HoodieRollbackMetadata hoodieRollbackMetadata = rollbackMetadata = commitInstantOpt.isPresent() ? table.rollback(this.context, rollbackInstantTime, (HoodieInstant)commitInstantOpt.get(), true, skipLocking) : table.rollback(this.context, rollbackInstantTime, table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, ((HoodieRollbackPlan)rollbackPlanOption.get()).getInstantToRollback().getAction(), commitInstantTime), false, skipLocking);
                if (timerContext != null) {
                    long durationInMs = this.metrics.getDurationInMs(timerContext.stop());
                    this.metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
                }
                return true;
            }
            throw new HoodieRollbackException("Failed to rollback " + this.config.getBasePath() + " commits " + commitInstantTime);
        }
        catch (Exception e) {
            throw new HoodieRollbackException("Failed to rollback " + this.config.getBasePath() + " commits " + commitInstantTime, e);
        }
    }

    public void rollbackFailedBootstrap() {
        LOG.info("Rolling back pending bootstrap if present");
        HoodieTable<?, I, ?, T> table = this.createTable(this.config, this.storageConf);
        HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompactionAndLogCompaction();
        Option instant = Option.fromJavaOptional(inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::requestedTime).findFirst());
        if (instant.isPresent() && InstantComparison.compareTimestamps((String)((String)instant.get()), (BiPredicate)InstantComparison.LESSER_THAN_OR_EQUALS, (String)"00000000000002")) {
            LOG.info("Found pending bootstrap instants. Rolling them back");
            this.executeUsingTxnManager((Option<HoodieInstant>)Option.empty(), () -> table.rollbackBootstrap(this.context, this.createNewInstantTime(false)));
            LOG.info("Finished rolling back pending bootstrap");
        }
        HoodieTableMetadataUtil.deleteMetadataTable((String)this.config.getBasePath(), (HoodieEngineContext)this.context);
    }

    protected boolean isPreCommitRequired() {
        return this.config.getWriteConflictResolutionStrategy().isPreCommitRequired();
    }

    private Option<String> delegateToTableServiceManager(TableServiceType tableServiceType, HoodieTable table) {
        if (!this.config.getTableServiceManagerConfig().isEnabledAndActionSupported(ActionType.compaction)) {
            return Option.empty();
        }
        HoodieTableServiceManagerClient tableServiceManagerClient = new HoodieTableServiceManagerClient(table.getMetaClient(), this.config.getTableServiceManagerConfig());
        switch (tableServiceType) {
            case COMPACT: {
                return tableServiceManagerClient.executeCompaction();
            }
            case CLUSTER: {
                return tableServiceManagerClient.executeClustering();
            }
            case CLEAN: {
                return tableServiceManagerClient.executeClean();
            }
        }
        LOG.info("Not supported delegate to table service manager, tableServiceType : " + tableServiceType.getAction());
        return Option.empty();
    }

    @Override
    public void close() {
        AsyncArchiveService.forceShutdown(this.asyncArchiveService);
        this.asyncArchiveService = null;
        AsyncCleanerService.forceShutdown(this.asyncCleanerService);
        this.asyncCleanerService = null;
        super.close();
    }

    protected void handleWriteErrors(List<HoodieWriteStat> writeStats, TableServiceType tableServiceType) {
        if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0L) {
            String message = tableServiceType + " failed to write to files:" + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(","));
            switch (tableServiceType) {
                case CLUSTER: {
                    throw new HoodieClusteringException(message);
                }
                case LOG_COMPACT: {
                    throw new HoodieLogCompactException(message);
                }
                case COMPACT: {
                    throw new HoodieCompactionException(message);
                }
            }
            throw new HoodieException(message);
        }
    }

    @Override
    protected void updateColumnsToIndexWithColStats(HoodieTableMetaClient metaClient, List<String> columnsToIndex) {
    }

    protected void releaseResources(String instantTime) {
    }
}

