/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.BaseHoodieClient;
import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hudi.client.RunsTableService;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.com.codahale.metrics.Timer;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Type;
import org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
import org.apache.hudi.internal.schema.action.TableChange;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, O>
extends BaseHoodieClient
implements RunsTableService {
    protected static final String LOOKUP_STR = "lookup";
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
    private final transient HoodieIndex<?, ?> index;
    private final SupportsUpgradeDowngrade upgradeDowngradeHelper;
    private transient WriteOperationType operationType;
    private transient HoodieWriteCommitCallback commitCallback;
    protected final transient HoodieMetrics metrics;
    protected transient Timer.Context writeTimer = null;
    protected transient Timer.Context compactionTimer;
    protected transient Timer.Context clusteringTimer;
    protected transient AsyncCleanerService asyncCleanerService;
    protected transient AsyncArchiveService asyncArchiveService;
    protected final TransactionManager txnManager;
    protected Option<Pair<HoodieInstant, Map<String, String>>> lastCompletedTxnAndMetadata = Option.empty();
    protected Set<String> pendingInflightAndRequestedInstants;

    @Deprecated
    public BaseHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
        this(context, writeConfig, Option.empty(), upgradeDowngradeHelper);
    }

    @Deprecated
    public BaseHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
        super(context, writeConfig, timelineService);
        this.metrics = new HoodieMetrics(this.config);
        this.index = this.createIndex(writeConfig);
        this.txnManager = new TransactionManager(this.config, this.fs);
        this.upgradeDowngradeHelper = upgradeDowngradeHelper;
    }

    protected abstract HoodieIndex<?, ?> createIndex(HoodieWriteConfig var1);

    public void setOperationType(WriteOperationType operationType) {
        this.operationType = operationType;
    }

    public WriteOperationType getOperationType() {
        return this.operationType;
    }

    public boolean commit(String instantTime, O writeStatuses) {
        return this.commit(instantTime, writeStatuses, Option.empty());
    }

    public boolean commit(String instantTime, O writeStatuses, Option<Map<String, String>> extraMetadata) {
        HoodieTableMetaClient metaClient = this.createMetaClient(false);
        String actionType = metaClient.getCommitActionType();
        return this.commit(instantTime, writeStatuses, extraMetadata, actionType, Collections.emptyMap());
    }

    public abstract boolean commit(String var1, O var2, Option<Map<String, String>> var3, String var4, Map<String, List<String>> var5);

    public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata, String commitActionType) {
        return this.commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap());
    }

    public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) {
        if (!this.config.allowEmptyCommit() && stats.isEmpty()) {
            return true;
        }
        LOG.info((Object)("Committing " + instantTime + " action " + commitActionType));
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, this.operationType, this.config.getWriteSchema(), commitActionType);
        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, table.getMetaClient().getCommitActionType(), instantTime);
        HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, this.heartbeatClient, this.config);
        this.txnManager.beginTransaction(Option.of(inflightInstant), this.lastCompletedTxnAndMetadata.isPresent() ? Option.of(this.lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
        try {
            this.preCommit(inflightInstant, metadata);
            this.commit(table, commitActionType, instantTime, metadata, stats);
            this.postCommit(table, metadata, instantTime, extraMetadata, false);
            LOG.info((Object)("Committed " + instantTime));
            this.releaseResources();
        }
        catch (IOException e) {
            throw new HoodieCommitException("Failed to complete commit " + this.config.getBasePath() + " at time " + instantTime, e);
        }
        finally {
            this.txnManager.endTransaction(Option.of(inflightInstant));
        }
        try {
            this.runTableServicesInline(table, metadata, extraMetadata);
        }
        catch (Exception e) {
            if (this.config.isFailOnInlineTableServiceExceptionEnabled()) {
                throw e;
            }
            LOG.warn((Object)("Inline compaction or clustering failed with exception: " + e.getMessage() + ". Moving further since \"hoodie.fail.writes.on.inline.table.service.exception\" is set to false."));
        }
        this.emitCommitMetrics(instantTime, metadata, commitActionType);
        if (this.config.writeCommitCallbackOn()) {
            if (null == this.commitCallback) {
                this.commitCallback = HoodieCommitCallbackFactory.create(this.config);
            }
            this.commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, this.config.getTableName(), this.config.getBasePath(), stats));
        }
        return true;
    }

    protected void commit(HoodieTable table, String commitActionType, String instantTime, HoodieCommitMetadata metadata, List<HoodieWriteStat> stats) throws IOException {
        LOG.info((Object)("Committing " + instantTime + " action " + commitActionType));
        HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
        this.finalizeWrite(table, instantTime, stats);
        if (!metadata.getExtraMetadata().containsKey("latest_schema") && metadata.getExtraMetadata().containsKey("schema") && table.getConfig().getSchemaEvolutionEnable()) {
            this.saveInternalSchema(table, instantTime, metadata);
        }
        this.writeTableMetadata(table, instantTime, commitActionType, metadata);
        activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
    }

    private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCommitMetadata metadata) {
        TableSchemaResolver schemaUtil = new TableSchemaResolver(table.getMetaClient());
        String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
        FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
        if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(this.config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) {
            InternalSchema internalSchema;
            Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(this.config.getSchema(), this.config.allowOperationMetadataField());
            if (historySchemaStr.isEmpty()) {
                internalSchema = AvroInternalSchemaConverter.convert(avroSchema);
                internalSchema.setSchemaId(Long.parseLong(instantTime));
            } else {
                internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime), SerDeHelper.parseSchemas(historySchemaStr));
            }
            InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema);
            if (evolvedSchema.equals(internalSchema)) {
                metadata.addMetadata("latest_schema", SerDeHelper.toJson(evolvedSchema));
                schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr.isEmpty() ? SerDeHelper.inheritSchemas(evolvedSchema, "") : historySchemaStr);
            } else {
                evolvedSchema.setSchemaId(Long.parseLong(instantTime));
                String newSchemaStr = SerDeHelper.toJson(evolvedSchema);
                metadata.addMetadata("latest_schema", newSchemaStr);
                schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(evolvedSchema, historySchemaStr));
            }
            metadata.addMetadata("schema", AvroInternalSchemaConverter.convert(evolvedSchema, avroSchema.getName()).toString());
        }
    }

    protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig var1, Configuration var2);

    void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
        if (this.writeTimer != null) {
            long durationInMs = this.metrics.getDurationInMs(this.writeTimer.stop());
            HoodieActiveTimeline.parseDateFromInstantTimeSafely(instantTime).ifPresent(parsedInstant -> this.metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, actionType));
            this.writeTimer = null;
        }
    }

    protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
    }

    protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
        this.context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + this.config.getTableName());
        table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime, table.isTableServiceAction(actionType, instantTime)));
    }

    public abstract I filterExists(I var1);

    public void bootstrap(Option<Map<String, String>> extraMetadata) {
        if (this.config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
            throw new HoodieException("Cannot bootstrap the table in multi-writer mode");
        }
        HoodieTable table = this.initTable(WriteOperationType.UPSERT, Option.ofNullable("00000000000001"));
        this.rollbackFailedBootstrap();
        table.bootstrap(this.context, extraMetadata);
    }

    protected void rollbackFailedBootstrap() {
        LOG.info((Object)"Rolling back pending bootstrap if present");
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
        Option<String> instant = Option.fromJavaOptional(inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
        if (instant.isPresent() && HoodieTimeline.compareTimestamps(instant.get(), HoodieTimeline.LESSER_THAN_OR_EQUALS, "00000000000002")) {
            LOG.info((Object)"Found pending bootstrap instants. Rolling them back");
            table.rollbackBootstrap(this.context, HoodieActiveTimeline.createNewInstantTime());
            LOG.info((Object)"Finished rolling back pending bootstrap");
        }
    }

    public abstract O upsert(I var1, String var2);

    public abstract O upsertPreppedRecords(I var1, String var2);

    public abstract O insert(I var1, String var2);

    public abstract O insertPreppedRecords(I var1, String var2);

    public abstract O bulkInsert(I var1, String var2);

    public abstract O bulkInsert(I var1, String var2, Option<BulkInsertPartitioner> var3);

    public abstract O bulkInsertPreppedRecords(I var1, String var2, Option<BulkInsertPartitioner> var3);

    public abstract O delete(K var1, String var2);

    public void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
        this.setOperationType(writeOperationType);
        this.lastCompletedTxnAndMetadata = this.txnManager.isOptimisticConcurrencyControlEnabled() ? TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient) : Option.empty();
        this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
        this.pendingInflightAndRequestedInstants.remove(instantTime);
        if (null == this.asyncCleanerService) {
            this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
        } else {
            this.asyncCleanerService.start(null);
        }
        if (null == this.asyncArchiveService) {
            this.asyncArchiveService = AsyncArchiveService.startAsyncArchiveIfEnabled(this);
        } else {
            this.asyncArchiveService.start(null);
        }
    }

    protected abstract O postWrite(HoodieWriteMetadata<O> var1, String var2, HoodieTable var3);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata, boolean acquireLockForArchival) {
        try {
            WriteMarkersFactory.get(this.config.getMarkersType(), table, instantTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
            this.autoCleanOnCommit();
            this.autoArchiveOnCommit(table, acquireLockForArchival);
        }
        finally {
            this.heartbeatClient.stop(instantTime);
        }
    }

    protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
        if (!this.tableServicesEnabled(this.config)) {
            return;
        }
        if (this.config.areAnyTableServicesExecutedInline().booleanValue() || this.config.areAnyTableServicesScheduledInline().booleanValue()) {
            if (this.config.isMetadataTableEnabled()) {
                table.getHoodieView().sync();
            }
            if (this.config.inlineCompactionEnabled()) {
                this.runAnyPendingCompactions(table);
                metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
                this.inlineCompaction(extraMetadata);
            } else {
                metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
            }
            if (!this.config.inlineCompactionEnabled() && this.config.scheduleInlineCompaction() && !table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants().findAny().isPresent()) {
                metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true");
                this.inlineScheduleCompaction(extraMetadata);
            }
            if (this.config.inlineClusteringEnabled()) {
                this.runAnyPendingClustering(table);
                metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
                this.inlineClustering(extraMetadata);
            } else {
                metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false");
            }
            if (!this.config.inlineClusteringEnabled() && this.config.scheduleInlineClustering() && !table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().findAny().isPresent()) {
                metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true");
                this.inlineScheduleClustering(extraMetadata);
            }
        }
    }

    protected void runAnyPendingCompactions(HoodieTable table) {
        table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants().forEach(instant -> {
            LOG.info((Object)("Running previously failed inflight compaction at instant " + instant));
            this.compact(instant.getTimestamp(), true);
        });
    }

    protected void runAnyPendingClustering(HoodieTable table) {
        table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
            Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
            if (instantPlan.isPresent()) {
                LOG.info((Object)("Running pending clustering at instant " + instantPlan.get().getLeft()));
                this.cluster(instant.getTimestamp(), true);
            }
        });
    }

    protected void autoCleanOnCommit() {
        if (!this.config.isAutoClean()) {
            return;
        }
        if (this.config.isAsyncClean()) {
            LOG.info((Object)"Async cleaner has been spawned. Waiting for it to finish");
            AsyncCleanerService.waitForCompletion(this.asyncCleanerService);
            LOG.info((Object)"Async cleaner has finished");
        } else {
            LOG.info((Object)"Start to clean synchronously.");
            this.clean(true);
        }
    }

    protected void autoArchiveOnCommit(HoodieTable table, boolean acquireLockForArchival) {
        if (!this.config.isAutoArchive()) {
            return;
        }
        if (this.config.isAsyncArchive()) {
            LOG.info((Object)"Async archiver has been spawned. Waiting for it to finish");
            AsyncArchiveService.waitForCompletion(this.asyncArchiveService);
            LOG.info((Object)"Async archiver has finished");
        } else {
            LOG.info((Object)"Start to archive synchronously.");
            this.archive(table, acquireLockForArchival);
        }
    }

    public void runAnyPendingCompactions() {
        this.runAnyPendingCompactions(this.createTable(this.config, this.hadoopConf));
    }

    public void savepoint(String user, String comment) {
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        if (table.getCompletedCommitsTimeline().empty()) {
            throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
        }
        String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp();
        LOG.info((Object)("Savepointing latest commit " + latestCommit));
        this.savepoint(latestCommit, user, comment);
    }

    public void savepoint(String instantTime, String user, String comment) {
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        table.savepoint(this.context, instantTime, user, comment);
    }

    public void deleteSavepoint(String savepointTime) {
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        SavepointHelpers.deleteSavepoint(table, savepointTime);
    }

    public void restoreToSavepoint(String savepointTime) {
        boolean initialMetadataTableIfNecessary = this.config.isMetadataTableEnabled();
        if (initialMetadataTableIfNecessary) {
            try {
                String metadataTableBasePathStr = HoodieTableMetadata.getMetadataTableBasePath(this.config.getBasePath());
                HoodieTableMetaClient mdtClient = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(metadataTableBasePathStr).build();
                HoodieInstant syncedInstant = new HoodieInstant(false, "deltacommit", savepointTime);
                if (mdtClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp())) {
                    mdtClient.getFs().delete(new Path(metadataTableBasePathStr), true);
                    initialMetadataTableIfNecessary = false;
                }
            }
            catch (Exception metadataTableBasePathStr) {
                // empty catch block
            }
        }
        HoodieTable table = this.initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary);
        SavepointHelpers.validateSavepointPresence(table, savepointTime);
        ValidationUtils.checkArgument(!this.config.shouldArchiveBeyondSavepoint(), "Restore is not supported when " + HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT.key() + " is enabled");
        this.restoreToInstant(savepointTime, initialMetadataTableIfNecessary);
        SavepointHelpers.validateSavepointRestore(table, savepointTime);
    }

    @Deprecated
    public boolean rollback(String commitInstantTime) throws HoodieRollbackException {
        HoodieTable table = this.initTable(WriteOperationType.UNKNOWN, Option.empty());
        Option<HoodiePendingRollbackInfo> pendingRollbackInfo = this.getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
        return this.rollback(commitInstantTime, pendingRollbackInfo, false);
    }

    @Deprecated
    public boolean rollback(String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException {
        LOG.info((Object)("Begin rollback of instant " + commitInstantTime));
        String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
        Timer.Context timerContext = this.metrics.getRollbackCtx();
        try {
            HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
            Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants().filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)).findFirst());
            if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) {
                LOG.info((Object)String.format("Scheduling Rollback at instant time : %s (exists in active timeline: %s), with rollback plan: %s", rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent()));
                Option rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan())).orElseGet(() -> table.scheduleRollback(this.context, rollbackInstantTime, (HoodieInstant)commitInstantOpt.get(), false, this.config.shouldRollbackUsingMarkers()));
                if (rollbackPlanOption.isPresent()) {
                    HoodieRollbackMetadata rollbackMetadata;
                    HoodieRollbackMetadata hoodieRollbackMetadata = rollbackMetadata = commitInstantOpt.isPresent() ? table.rollback(this.context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking) : table.rollback(this.context, rollbackInstantTime, new HoodieInstant(true, ((HoodieRollbackPlan)((Object)rollbackPlanOption.get())).getInstantToRollback().getAction(), commitInstantTime), false, skipLocking);
                    if (timerContext != null) {
                        long durationInMs = this.metrics.getDurationInMs(timerContext.stop());
                        this.metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted().intValue());
                    }
                    return true;
                }
                throw new HoodieRollbackException("Failed to rollback " + this.config.getBasePath() + " commits " + commitInstantTime);
            }
            LOG.warn((Object)("Cannot find instant " + commitInstantTime + " in the timeline, for rollback"));
            return false;
        }
        catch (Exception e) {
            throw new HoodieRollbackException("Failed to rollback " + this.config.getBasePath() + " commits " + commitInstantTime, e);
        }
    }

    public HoodieRestoreMetadata restoreToInstant(String instantTime, boolean initialMetadataTableIfNecessary) throws HoodieRestoreException {
        LOG.info((Object)("Begin restore to instant " + instantTime));
        String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
        Timer.Context timerContext = this.metrics.getRollbackCtx();
        try {
            HoodieTable table = this.initTable(WriteOperationType.UNKNOWN, Option.of(restoreInstantTime), initialMetadataTableIfNecessary);
            Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(this.context, restoreInstantTime, instantTime);
            if (restorePlanOption.isPresent()) {
                HoodieRestoreMetadata restoreMetadata = table.restore(this.context, restoreInstantTime, instantTime);
                if (timerContext != null) {
                    long durationInMs = this.metrics.getDurationInMs(timerContext.stop());
                    long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream().flatMap(Collection::stream).mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted).sum();
                    this.metrics.updateRollbackMetrics(durationInMs, totalFilesDeleted);
                }
                return restoreMetadata;
            }
            throw new HoodieRestoreException("Failed to restore " + this.config.getBasePath() + " to commit " + instantTime);
        }
        catch (Exception e) {
            throw new HoodieRestoreException("Failed to restore to " + instantTime, e);
        }
    }

    public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
        return this.clean(cleanInstantTime, true, false);
    }

    public HoodieCleanMetadata clean(String cleanInstantTime, boolean skipLocking) throws HoodieIOException {
        return this.clean(cleanInstantTime, true, skipLocking);
    }

    public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline, boolean skipLocking) throws HoodieIOException {
        if (!this.tableServicesEnabled(this.config)) {
            return null;
        }
        Timer.Context timerContext = this.metrics.getCleanCtx();
        CleanerUtils.rollbackFailedWrites(this.config.getFailedWritesCleanPolicy(), "clean", () -> this.rollbackFailedWrites(skipLocking));
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        if (this.config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
            LOG.info((Object)"Cleaner started");
            if (scheduleInline) {
                this.scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
                table.getMetaClient().reloadActiveTimeline();
            }
        }
        HoodieCleanMetadata metadata = table.clean(this.context, cleanInstantTime, skipLocking);
        if (timerContext != null && metadata != null) {
            long durationMs = this.metrics.getDurationInMs(timerContext.stop());
            this.metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
            LOG.info((Object)("Cleaned " + metadata.getTotalFilesDeleted() + " files Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() + " cleanerElapsedMs" + durationMs));
        }
        return metadata;
    }

    public HoodieCleanMetadata clean() {
        return this.clean(false);
    }

    public HoodieCleanMetadata clean(boolean skipLocking) {
        return this.clean(HoodieActiveTimeline.createNewInstantTime(), skipLocking);
    }

    protected void archive(HoodieTable table, boolean acquireLockForArchival) {
        if (!this.tableServicesEnabled(this.config)) {
            return;
        }
        try {
            HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(this.config, table);
            archiver.archiveIfRequired(this.context, acquireLockForArchival);
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Failed to archive", ioe);
        }
    }

    public void archive() {
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        this.archive(table, true);
    }

    public String startCommit() {
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        return this.startCommit(metaClient.getCommitActionType(), metaClient);
    }

    public String startCommit(String actionType, HoodieTableMetaClient metaClient) {
        CleanerUtils.rollbackFailedWrites(this.config.getFailedWritesCleanPolicy(), "commit", () -> this.rollbackFailedWrites());
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        this.startCommit(instantTime, actionType, metaClient);
        return instantTime;
    }

    public void startCommitWithTime(String instantTime) {
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        this.startCommitWithTime(instantTime, metaClient.getCommitActionType(), metaClient);
    }

    public void startCommitWithTime(String instantTime, String actionType) {
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        this.startCommitWithTime(instantTime, actionType, metaClient);
    }

    private void startCommitWithTime(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
        CleanerUtils.rollbackFailedWrites(this.config.getFailedWritesCleanPolicy(), "commit", () -> this.rollbackFailedWrites());
        this.startCommit(instantTime, actionType, metaClient);
    }

    private void startCommit(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
        LOG.info((Object)("Generate a new instant time: " + instantTime + " action: " + actionType));
        metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime), "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :" + latestPending + ",  Ingesting at " + instantTime));
        if (this.config.getFailedWritesCleanPolicy().isLazy()) {
            this.heartbeatClient.start(instantTime);
        }
        if (actionType.equals("replacecommit")) {
            metaClient.getActiveTimeline().createRequestedReplaceCommit(instantTime, actionType);
        } else {
            metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, actionType, instantTime));
        }
    }

    public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        return this.scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
    }

    public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        return this.scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
    }

    public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionTypes) {
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        Option<HoodieIndexPlan> indexPlan = this.createTable(this.config, this.hadoopConf).scheduleIndexing(this.context, instantTime, partitionTypes);
        return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty();
    }

    public Option<HoodieIndexCommitMetadata> index(String indexInstantTime) {
        return this.createTable(this.config, this.hadoopConf).index(this.context, indexInstantTime);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dropIndex(List<MetadataPartitionType> partitionTypes) {
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        String dropInstant = HoodieActiveTimeline.createNewInstantTime();
        HoodieInstant ownerInstant = new HoodieInstant(true, "indexing", dropInstant);
        this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty());
        try {
            this.context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table: " + this.config.getTableName());
            table.getMetadataWriter(dropInstant).ifPresent(w -> {
                try {
                    ((HoodieTableMetadataWriter)w).dropMetadataPartitions(partitionTypes);
                }
                catch (IOException e) {
                    throw new HoodieIndexException("Failed to drop metadata index. ", e);
                }
            });
        }
        finally {
            this.txnManager.endTransaction(Option.of(ownerInstant));
        }
    }

    public HoodieWriteMetadata<O> compact(String compactionInstantTime) {
        return this.compact(compactionInstantTime, this.config.shouldAutoCommit());
    }

    public abstract void commitCompaction(String var1, HoodieCommitMetadata var2, Option<Map<String, String>> var3);

    protected abstract void completeCompaction(HoodieCommitMetadata var1, HoodieTable var2, String var3);

    private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieTableMetaClient metaClient) {
        HoodieTimeline inflightTimelineWithReplaceCommit = metaClient.getCommitsTimeline().filterPendingExcludingCompaction();
        HoodieTimeline inflightTimelineExcludeClusteringCommit = inflightTimelineWithReplaceCommit.filter(instant -> {
            if (instant.getAction().equals("replacecommit")) {
                Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, instant);
                return !instantPlan.isPresent();
            }
            return true;
        });
        return inflightTimelineExcludeClusteringCommit;
    }

    protected Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) {
        return this.getPendingRollbackInfo(metaClient, commitToRollback, true);
    }

    public Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
        return this.getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty());
    }

    protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
        return this.getPendingRollbackInfos(metaClient, true);
    }

    protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) {
        List instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList());
        HashMap<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<String, Option<HoodiePendingRollbackInfo>>();
        for (HoodieInstant rollbackInstant : instants) {
            HoodieRollbackPlan rollbackPlan;
            try {
                rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, rollbackInstant);
            }
            catch (Exception e) {
                if (rollbackInstant.isRequested()) {
                    LOG.warn((Object)("Fetching rollback plan failed for " + rollbackInstant + ", deleting the plan since it's in REQUESTED state"), (Throwable)e);
                    try {
                        metaClient.getActiveTimeline().deletePending(rollbackInstant);
                    }
                    catch (HoodieIOException he) {
                        LOG.warn((Object)("Cannot delete " + rollbackInstant), (Throwable)he);
                    }
                    continue;
                }
                LOG.warn((Object)("Fetching rollback plan failed for " + rollbackInstant + ", skip the plan"), (Throwable)e);
                continue;
            }
            try {
                String action = rollbackPlan.getInstantToRollback().getAction();
                if (ignoreCompactionAndClusteringInstants) {
                    boolean isClustering;
                    if ("compaction".equals(action) || (isClustering = "replacecommit".equals(action) && ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(true, rollbackPlan.getInstantToRollback().getAction(), rollbackPlan.getInstantToRollback().getCommitTime())).isPresent())) continue;
                    String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
                    infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
                    continue;
                }
                infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
            }
            catch (Exception e) {
                LOG.warn((Object)("Processing rollback plan failed for " + rollbackInstant + ", skip the plan"), (Throwable)e);
            }
        }
        return infoMap;
    }

    protected Boolean rollbackFailedWrites() {
        return this.rollbackFailedWrites(false);
    }

    protected Boolean rollbackFailedWrites(boolean skipLocking) {
        HoodieTable<T, I, K, O> table = this.createTable(this.config, this.hadoopConf);
        List<String> instantsToRollback = this.getInstantsToRollback(table.getMetaClient(), this.config.getFailedWritesCleanPolicy(), Option.empty());
        Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = this.getPendingRollbackInfos(table.getMetaClient());
        instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent((String)entry, Option.empty()));
        this.rollbackFailedWrites(pendingRollbacks, skipLocking);
        return true;
    }

    protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking) {
        LinkedHashMap reverseSortedRollbackInstants = instantsToRollback.entrySet().stream().sorted((i1, i2) -> ((String)i2.getKey()).compareTo((String)i1.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
        for (Map.Entry entry : reverseSortedRollbackInstants.entrySet()) {
            if (HoodieTimeline.compareTimestamps((String)entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS, "00000000000002")) {
                this.rollbackFailedBootstrap();
                HeartbeatUtils.deleteHeartbeatFile(this.fs, this.basePath, (String)entry.getKey(), this.config);
                break;
            }
            this.rollback((String)entry.getKey(), (Option)entry.getValue(), skipLocking);
            HeartbeatUtils.deleteHeartbeatFile(this.fs, this.basePath, (String)entry.getKey(), this.config);
        }
    }

    protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option<String> curInstantTime) {
        Stream<HoodieInstant> inflightInstantsStream = this.getInflightTimelineExcludeCompactionAndClustering(metaClient).getReverseOrderedInstants();
        if (cleaningPolicy.isEager()) {
            return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
                if (curInstantTime.isPresent()) {
                    return !entry.equals(curInstantTime.get());
                }
                return true;
            }).collect(Collectors.toList());
        }
        if (cleaningPolicy.isLazy()) {
            return inflightInstantsStream.filter(instant -> {
                try {
                    return this.heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
                }
                catch (IOException io) {
                    throw new HoodieException("Failed to check heartbeat for instant " + instant, io);
                }
            }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
        }
        if (cleaningPolicy.isNever()) {
            return Collections.EMPTY_LIST;
        }
        throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + (Object)((Object)this.config.getFailedWritesCleanPolicy()));
    }

    protected abstract HoodieWriteMetadata<O> compact(String var1, boolean var2);

    protected Option<String> inlineCompaction(Option<Map<String, String>> extraMetadata) {
        Option<String> compactionInstantTimeOpt = this.inlineScheduleCompaction(extraMetadata);
        compactionInstantTimeOpt.ifPresent(compactInstantTime -> this.compact((String)compactInstantTime, true));
        return compactionInstantTimeOpt;
    }

    protected Option<String> inlineScheduleCompaction(Option<Map<String, String>> extraMetadata) {
        return this.scheduleCompaction(extraMetadata);
    }

    public Option<String> scheduleClustering(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        return this.scheduleClusteringAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
    }

    public boolean scheduleClusteringAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        return this.scheduleTableService(instantTime, extraMetadata, TableServiceType.CLUSTER).isPresent();
    }

    protected Option<String> scheduleCleaning(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        return this.scheduleCleaningAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
    }

    protected boolean scheduleCleaningAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
        return this.scheduleTableService(instantTime, extraMetadata, TableServiceType.CLEAN).isPresent();
    }

    public abstract HoodieWriteMetadata<O> cluster(String var1, boolean var2);

    public Option<String> scheduleTableService(Option<Map<String, String>> extraMetadata, TableServiceType tableServiceType) {
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        return this.scheduleTableService(instantTime, extraMetadata, tableServiceType);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata, TableServiceType tableServiceType) {
        Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED, tableServiceType.getAction(), instantTime));
        try {
            this.txnManager.beginTransaction(inflightInstant, Option.empty());
            LOG.info((Object)("Scheduling table service " + (Object)((Object)tableServiceType)));
            Option<String> option = this.scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
            return option;
        }
        finally {
            this.txnManager.endTransaction(inflightInstant);
        }
    }

    private Option<String> scheduleTableServiceInternal(String instantTime, Option<Map<String, String>> extraMetadata, TableServiceType tableServiceType) {
        if (!this.tableServicesEnabled(this.config)) {
            return Option.empty();
        }
        switch (tableServiceType) {
            case ARCHIVE: {
                LOG.info((Object)"Scheduling archiving is not supported. Skipping.");
                return Option.empty();
            }
            case CLUSTER: {
                LOG.info((Object)("Scheduling clustering at instant time :" + instantTime));
                Option<HoodieClusteringPlan> clusteringPlan = this.createTable(this.config, this.hadoopConf).scheduleClustering(this.context, instantTime, extraMetadata);
                return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
            }
            case COMPACT: {
                LOG.info((Object)("Scheduling compaction at instant time :" + instantTime));
                Option<HoodieCompactionPlan> compactionPlan = this.createTable(this.config, this.hadoopConf).scheduleCompaction(this.context, instantTime, extraMetadata);
                return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
            }
            case CLEAN: {
                LOG.info((Object)("Scheduling cleaning at instant time :" + instantTime));
                Option<HoodieCleanerPlan> cleanerPlan = this.createTable(this.config, this.hadoopConf).scheduleCleaning(this.context, instantTime, extraMetadata);
                return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
            }
        }
        throw new IllegalArgumentException("Invalid TableService " + (Object)((Object)tableServiceType));
    }

    protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
        Option<String> clusteringInstantOpt = this.inlineScheduleClustering(extraMetadata);
        clusteringInstantOpt.ifPresent(clusteringInstant -> this.cluster((String)clusteringInstant, true));
        return clusteringInstantOpt;
    }

    protected Option<String> inlineScheduleClustering(Option<Map<String, String>> extraMetadata) {
        return this.scheduleClustering(extraMetadata);
    }

    protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {
        try {
            Timer.Context finalizeCtx = this.metrics.getFinalizeCtx();
            table.finalizeWrite(this.context, instantTime, stats);
            if (finalizeCtx != null) {
                Option<Long> durationInMs = Option.of(this.metrics.getDurationInMs(finalizeCtx.stop()));
                durationInMs.ifPresent(duration -> {
                    LOG.info((Object)("Finalize write elapsed time (milliseconds): " + duration));
                    this.metrics.updateFinalizeWriteMetrics((long)duration, stats.size());
                });
            }
        }
        catch (HoodieIOException ioe) {
            throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
        }
    }

    public HoodieMetrics getMetrics() {
        return this.metrics;
    }

    public HoodieIndex<?, ?> getIndex() {
        return this.index;
    }

    protected abstract HoodieTable doInitTable(HoodieTableMetaClient var1, Option<String> var2, boolean var3);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
        HoodieTable table;
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        if (operationType == WriteOperationType.DELETE) {
            this.setWriteSchemaForDeletes(metaClient);
        }
        Option<HoodieInstant> ownerInstant = Option.empty();
        if (instantTime.isPresent()) {
            ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
        }
        this.txnManager.beginTransaction(ownerInstant, Option.empty());
        try {
            this.tryUpgrade(metaClient, instantTime);
            table = this.doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
        }
        finally {
            this.txnManager.endTransaction(ownerInstant);
        }
        metaClient.validateTableProperties(this.config.getProps());
        switch (operationType) {
            case INSERT: 
            case INSERT_PREPPED: 
            case UPSERT: 
            case UPSERT_PREPPED: 
            case BULK_INSERT: 
            case BULK_INSERT_PREPPED: 
            case INSERT_OVERWRITE: 
            case INSERT_OVERWRITE_TABLE: {
                this.setWriteTimer(table);
                break;
            }
            case CLUSTER: {
                this.clusteringTimer = this.metrics.getClusteringCtx();
                break;
            }
            case COMPACT: {
                this.compactionTimer = this.metrics.getCompactionCtx();
                break;
            }
        }
        return table;
    }

    protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime) {
        return this.initTable(operationType, instantTime, this.config.isMetadataTableEnabled());
    }

    protected void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) {
        try {
            HoodieCommitMetadata commitMetadata;
            HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
            Option<HoodieInstant> lastInstant = activeTimeline.filterCompletedInstants().filter(s -> s.getAction().equals(metaClient.getCommitActionType()) || s.getAction().equals("replacecommit")).lastInstant();
            if (lastInstant.isPresent()) {
                commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
                if (!commitMetadata.getExtraMetadata().containsKey("schema")) {
                    throw new HoodieIOException("Latest commit does not have any schema in commit metadata");
                }
            } else {
                throw new HoodieIOException("Deletes issued without any prior commits");
            }
            this.config.setSchema(commitMetadata.getExtraMetadata().get("schema"));
        }
        catch (IOException e) {
            throw new HoodieIOException("IOException thrown while reading last commit metadata", e);
        }
    }

    protected void releaseResources() {
    }

    @Override
    public void close() {
        AsyncArchiveService.forceShutdown(this.asyncArchiveService);
        this.asyncArchiveService = null;
        AsyncCleanerService.forceShutdown(this.asyncCleanerService);
        this.asyncCleanerService = null;
        super.close();
        this.index.close();
        this.heartbeatClient.stop();
        this.txnManager.close();
    }

    private void setWriteTimer(HoodieTable table) {
        String commitType = table.getMetaClient().getCommitActionType();
        if (commitType.equals("commit")) {
            this.writeTimer = this.metrics.getCommitCtx();
        } else if (commitType.equals("deltacommit")) {
            this.writeTimer = this.metrics.getDeltaCommitCtx();
        }
    }

    protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
        UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(metaClient, this.config, this.context, this.upgradeDowngradeHelper);
        if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
            List<String> instantsToRollback = this.getInstantsToRollback(metaClient = HoodieTableMetaClient.reload(metaClient), HoodieFailedWritesCleaningPolicy.EAGER, instantTime);
            if (!instantsToRollback.isEmpty()) {
                Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = this.getPendingRollbackInfos(metaClient);
                instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent((String)entry, Option.empty()));
                this.rollbackFailedWrites(pendingRollbacks, true);
            }
            new UpgradeDowngrade(metaClient, this.config, this.context, this.upgradeDowngradeHelper).run(HoodieTableVersion.current(), instantTime.orElse(null));
            metaClient.reloadActiveTimeline();
        }
    }

    public void addColumn(String colName, Schema schema, String doc, String position, TableChange.ColumnPositionChange.ColumnPositionType positionType) {
        Pair<InternalSchema, HoodieTableMetaClient> pair = this.getInternalSchemaAndMetaClient();
        InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft()).applyAddChange(colName, AvroInternalSchemaConverter.convertToField(schema), doc, position, positionType);
        this.commitTableChange(newSchema, pair.getRight());
    }

    public void addColumn(String colName, Schema schema) {
        this.addColumn(colName, schema, null, "", TableChange.ColumnPositionChange.ColumnPositionType.NO_OPERATION);
    }

    public void deleteColumns(String ... colNames) {
        Pair<InternalSchema, HoodieTableMetaClient> pair = this.getInternalSchemaAndMetaClient();
        InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft()).applyDeleteChange(colNames);
        this.commitTableChange(newSchema, pair.getRight());
    }

    public void renameColumn(String colName, String newName) {
        Pair<InternalSchema, HoodieTableMetaClient> pair = this.getInternalSchemaAndMetaClient();
        InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft()).applyRenameChange(colName, newName);
        this.commitTableChange(newSchema, pair.getRight());
    }

    public void updateColumnNullability(String colName, boolean nullable) {
        Pair<InternalSchema, HoodieTableMetaClient> pair = this.getInternalSchemaAndMetaClient();
        InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft()).applyColumnNullabilityChange(colName, nullable);
        this.commitTableChange(newSchema, pair.getRight());
    }

    public void updateColumnType(String colName, Type newType) {
        Pair<InternalSchema, HoodieTableMetaClient> pair = this.getInternalSchemaAndMetaClient();
        InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft()).applyColumnTypeChange(colName, newType);
        this.commitTableChange(newSchema, pair.getRight());
    }

    public void updateColumnComment(String colName, String doc) {
        Pair<InternalSchema, HoodieTableMetaClient> pair = this.getInternalSchemaAndMetaClient();
        InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft()).applyColumnCommentChange(colName, doc);
        this.commitTableChange(newSchema, pair.getRight());
    }

    public void reOrderColPosition(String colName, String referColName, TableChange.ColumnPositionChange.ColumnPositionType orderType) {
        if (colName == null || orderType == null || referColName == null) {
            return;
        }
        Pair<InternalSchema, HoodieTableMetaClient> pair = this.getInternalSchemaAndMetaClient();
        InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft()).applyReOrderColPositionChange(colName, referColName, orderType);
        this.commitTableChange(newSchema, pair.getRight());
    }

    private Pair<InternalSchema, HoodieTableMetaClient> getInternalSchemaAndMetaClient() {
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
        return Pair.of(this.getInternalSchema(schemaUtil), metaClient);
    }

    private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient metaClient) {
        TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
        String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElseGet(() -> SerDeHelper.inheritSchemas(this.getInternalSchema(schemaUtil), ""));
        Schema schema = AvroInternalSchemaConverter.convert(newSchema, this.config.getTableName());
        String commitActionType = CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, metaClient.getTableType());
        String instantTime = HoodieActiveTimeline.createNewInstantTime();
        this.startCommitWithTime(instantTime, commitActionType, metaClient);
        this.config.setSchema(schema.toString());
        HoodieActiveTimeline timeLine = metaClient.getActiveTimeline();
        HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitActionType, instantTime);
        HoodieCommitMetadata metadata = new HoodieCommitMetadata();
        metadata.setOperationType(WriteOperationType.ALTER_SCHEMA);
        try {
            timeLine.transitionRequestedToInflight(requested, Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
        }
        catch (IOException io) {
            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
        }
        HashMap<String, String> extraMeta = new HashMap<String, String>();
        extraMeta.put("latest_schema", SerDeHelper.toJson(newSchema.setSchemaId(Long.parseLong(instantTime))));
        FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient);
        schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
        this.commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), commitActionType);
    }

    private InternalSchema getInternalSchema(TableSchemaResolver schemaUtil) {
        return schemaUtil.getTableInternalSchemaFromCommitMetadata().orElseGet(() -> {
            try {
                return AvroInternalSchemaConverter.convert(schemaUtil.getTableAvroSchema());
            }
            catch (Exception e) {
                throw new HoodieException(String.format("cannot find schema for current table: %s", this.config.getBasePath()));
            }
        });
    }
}

