/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.AbstractHoodieClient;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.com.codahale.metrics.Timer;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRollingStat;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload>
extends AbstractHoodieClient {
    private static final Logger LOG = LogManager.getLogger(AbstractHoodieWriteClient.class);
    private static final String UPDATE_STR = "update";
    private final transient HoodieMetrics metrics = new HoodieMetrics(this.config, this.config.getTableName());
    private final transient HoodieIndex<T> index;
    private transient Timer.Context writeContext = null;

    protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, HoodieWriteConfig clientConfig) {
        super(jsc, clientConfig);
        this.index = index;
    }

    protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, HoodieWriteConfig clientConfig, Option<EmbeddedTimelineService> timelineServer) {
        super(jsc, clientConfig, timelineServer);
        this.index = index;
    }

    public boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses) {
        return this.commit(commitTime, writeStatuses, Option.empty());
    }

    public boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata) {
        HoodieTableMetaClient metaClient = this.createMetaClient(false);
        return this.commit(commitTime, writeStatuses, extraMetadata, metaClient.getCommitActionType());
    }

    protected JavaRDD<WriteStatus> updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieTable<T> table, String commitTime) {
        writeStatusRDD = writeStatusRDD.persist(this.config.getWriteStatusStorageLevel());
        Timer.Context indexTimer = this.metrics.getIndexCtx();
        JavaRDD<WriteStatus> statuses = this.index.updateLocation((JavaRDD<WriteStatus>)writeStatusRDD, this.jsc, table);
        this.metrics.updateIndexMetrics(UPDATE_STR, this.metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
        this.commitOnAutoCommit(commitTime, statuses, table.getMetaClient().getCommitActionType());
        return statuses;
    }

    protected void commitOnAutoCommit(String commitTime, JavaRDD<WriteStatus> resultRDD, String actionType) {
        if (this.config.shouldAutoCommit().booleanValue()) {
            LOG.info((Object)("Auto commit enabled: Committing " + commitTime));
            boolean commitResult = this.commit(commitTime, resultRDD, Option.empty(), actionType);
            if (!commitResult) {
                throw new HoodieCommitException("Failed to commit " + commitTime);
            }
        } else {
            LOG.info((Object)("Auto commit disabled for " + commitTime));
        }
    }

    private boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String actionType) {
        LOG.info((Object)("Commiting " + commitTime));
        HoodieTable table = HoodieTable.getHoodieTable(this.createMetaClient(true), this.config, this.jsc);
        HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
        HoodieCommitMetadata metadata = new HoodieCommitMetadata();
        List stats = writeStatuses.map(WriteStatus::getStat).collect();
        this.updateMetadataAndRollingStats(actionType, metadata, stats);
        this.finalizeWrite(table, commitTime, stats);
        if (extraMetadata.isPresent()) {
            extraMetadata.get().forEach(metadata::addMetadata);
        }
        metadata.addMetadata("schema", this.config.getSchema());
        try {
            activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, commitTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
            this.postCommit(metadata, commitTime, extraMetadata);
            if (this.writeContext != null) {
                long durationInMs = this.metrics.getDurationInMs(this.writeContext.stop());
                this.metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(commitTime).getTime(), durationInMs, metadata, actionType);
                this.writeContext = null;
            }
            LOG.info((Object)("Committed " + commitTime));
        }
        catch (IOException e) {
            throw new HoodieCommitException("Failed to complete commit " + this.config.getBasePath() + " at time " + commitTime, e);
        }
        catch (ParseException e) {
            throw new HoodieCommitException("Failed to complete commit " + this.config.getBasePath() + " at time " + commitTime + "Instant time is not of valid format", e);
        }
        return true;
    }

    protected abstract void postCommit(HoodieCommitMetadata var1, String var2, Option<Map<String, String>> var3) throws IOException;

    protected void finalizeWrite(HoodieTable<T> table, String instantTime, List<HoodieWriteStat> stats) {
        try {
            Timer.Context finalizeCtx = this.metrics.getFinalizeCtx();
            table.finalizeWrite(this.jsc, instantTime, stats);
            if (finalizeCtx != null) {
                Option<Long> durationInMs = Option.of(this.metrics.getDurationInMs(finalizeCtx.stop()));
                durationInMs.ifPresent(duration -> {
                    LOG.info((Object)("Finalize write elapsed time (milliseconds): " + duration));
                    this.metrics.updateFinalizeWriteMetrics((long)duration, stats.size());
                });
            }
        }
        catch (HoodieIOException ioe) {
            throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
        }
    }

    private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata, List<HoodieWriteStat> writeStats) {
        try {
            HoodieCommitMetadata commitMetadata;
            Option<String> lastRollingStat;
            HoodieTable table = HoodieTable.getHoodieTable(this.createMetaClient(true), this.config, this.jsc);
            HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType);
            for (HoodieWriteStat stat : writeStats) {
                String partitionPath = stat.getPartitionPath();
                metadata.addWriteStat(partitionPath, stat);
                HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat.getFileId(), stat.getNumWrites() - (stat.getNumUpdateWrites() - stat.getNumDeletes()), stat.getNumUpdateWrites(), stat.getNumDeletes(), stat.getTotalWriteBytes());
                rollingStatMetadata.addRollingStat(partitionPath, hoodieRollingStat);
            }
            Option<HoodieInstant> lastInstant = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
            if (lastInstant.isPresent() && (lastRollingStat = Option.ofNullable((commitMetadata = HoodieCommitMetadata.fromBytes(table.getActiveTimeline().getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class)).getExtraMetadata().get("ROLLING_STAT"))).isPresent()) {
                rollingStatMetadata = rollingStatMetadata.merge(HoodieCommitMetadata.fromBytes(lastRollingStat.get().getBytes(), HoodieRollingStatMetadata.class));
            }
            metadata.addMetadata("ROLLING_STAT", rollingStatMetadata.toJsonString());
        }
        catch (IOException io) {
            throw new HoodieCommitException("Unable to save rolling stats");
        }
    }

    public HoodieMetrics getMetrics() {
        return this.metrics;
    }

    public HoodieIndex<T> getIndex() {
        return this.index;
    }

    protected HoodieTable getTableAndInitCtx(OperationType operationType) {
        HoodieTable table;
        HoodieTableMetaClient metaClient = this.createMetaClient(true);
        if (operationType == OperationType.DELETE) {
            this.setWriteSchemaFromLastInstant(metaClient);
        }
        this.writeContext = (table = HoodieTable.getHoodieTable(metaClient, this.config, this.jsc)).getMetaClient().getCommitActionType().equals("commit") ? this.metrics.getCommitCtx() : this.metrics.getDeltaCommitCtx();
        return table;
    }

    private void setWriteSchemaFromLastInstant(HoodieTableMetaClient metaClient) {
        try {
            HoodieCommitMetadata commitMetadata;
            HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
            Option<HoodieInstant> lastInstant = activeTimeline.filterCompletedInstants().filter(s -> s.getAction().equals(metaClient.getCommitActionType())).lastInstant();
            if (lastInstant.isPresent()) {
                commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
                if (!commitMetadata.getExtraMetadata().containsKey("schema")) {
                    throw new HoodieIOException("Latest commit does not have any schema in commit metadata");
                }
            } else {
                throw new HoodieIOException("Deletes issued without any prior commits");
            }
            this.config.setSchema(commitMetadata.getExtraMetadata().get("schema"));
        }
        catch (IOException e) {
            throw new HoodieIOException("IOException thrown while reading last commit metadata", e);
        }
    }

    @Override
    public void close() {
        super.close();
        this.index.close();
    }

    protected void rollbackInternal(String commitToRollback) {
        String startRollbackTime = HoodieActiveTimeline.createNewInstantTime();
        Timer.Context context = this.metrics.getRollbackCtx();
        try {
            HoodieTable table = HoodieTable.getHoodieTable(this.createMetaClient(true), this.config, this.jsc);
            Option<HoodieInstant> rollbackInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants().filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitToRollback)).findFirst());
            if (rollbackInstantOpt.isPresent()) {
                List<HoodieRollbackStat> stats = this.doRollbackAndGetStats(rollbackInstantOpt.get());
                this.finishRollback(context, stats, Collections.singletonList(commitToRollback), startRollbackTime);
            }
        }
        catch (IOException e) {
            throw new HoodieRollbackException("Failed to rollback " + this.config.getBasePath() + " commits " + commitToRollback, e);
        }
    }

    protected List<HoodieRollbackStat> doRollbackAndGetStats(HoodieInstant instantToRollback) throws IOException {
        String commitToRollback = instantToRollback.getTimestamp();
        HoodieTable table = HoodieTable.getHoodieTable(this.createMetaClient(true), this.config, this.jsc);
        HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline();
        HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
        List<String> savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
        savepoints.forEach(s -> {
            if (s.contains(commitToRollback)) {
                throw new HoodieRollbackException("Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s);
            }
        });
        if (commitTimeline.empty() && inflightAndRequestedCommitTimeline.empty()) {
            LOG.info((Object)("No commits to rollback " + commitToRollback));
        }
        if (commitToRollback != null && !commitTimeline.empty() && !commitTimeline.findInstantsAfter(commitToRollback, Integer.MAX_VALUE).empty()) {
            throw new HoodieRollbackException("Found commits after time :" + commitToRollback + ", please rollback greater commits first");
        }
        List inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
        if (commitToRollback != null && !inflights.isEmpty() && inflights.indexOf(commitToRollback) != inflights.size() - 1) {
            throw new HoodieRollbackException("Found in-flight commits after time :" + commitToRollback + ", please rollback greater commits first");
        }
        List<HoodieRollbackStat> stats = table.rollback(this.jsc, instantToRollback, true);
        LOG.info((Object)("Deleted inflight commits " + commitToRollback));
        if (!this.getIndex().rollbackCommit(commitToRollback)) {
            throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback);
        }
        LOG.info((Object)("Index rolled back for commits " + commitToRollback));
        return stats;
    }

    private void finishRollback(Timer.Context context, List<HoodieRollbackStat> rollbackStats, List<String> commitsToRollback, String startRollbackTime) throws IOException {
        HoodieTable table = HoodieTable.getHoodieTable(this.createMetaClient(true), this.config, this.jsc);
        Option<Long> durationInMs = Option.empty();
        long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
        if (context != null) {
            durationInMs = Option.of(this.metrics.getDurationInMs(context.stop()));
            this.metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted);
        }
        HoodieRollbackMetadata rollbackMetadata = AvroUtils.convertRollbackMetadata(startRollbackTime, durationInMs, commitsToRollback, rollbackStats);
        table.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.INFLIGHT, "rollback", startRollbackTime));
        table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, "rollback", startRollbackTime), AvroUtils.serializeRollbackMetadata(rollbackMetadata));
        LOG.info((Object)("Rollback of Commits " + commitsToRollback + " is complete"));
        if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
            LOG.info((Object)"Cleaning up older rollback meta files");
            FSUtils.deleteOlderRollbackMetaFiles(this.fs, table.getMetaClient().getMetaPath(), table.getActiveTimeline().getRollbackTimeline().getInstants());
        }
    }

    static enum OperationType {
        INSERT,
        INSERT_PREPPED,
        UPSERT,
        UPSERT_PREPPED,
        DELETE,
        BULK_INSERT,
        BULK_INSERT_PREPPED,
        BOOTSTRAP;

    }
}

