/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.BaseHoodieTableServiceClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.spark.api.java.JavaRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkRDDTableServiceClient<T>
extends BaseHoodieTableServiceClient<JavaRDD<WriteStatus>> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkRDDTableServiceClient.class);

    protected SparkRDDTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, Option<EmbeddedTimelineService> timelineService) {
        super(context, clientConfig, timelineService);
    }

    @Override
    protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
        HoodieInstant inflightInstant;
        HoodieSparkTable table = HoodieSparkTable.create(this.config, this.context);
        HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
        if (pendingCompactionTimeline.containsInstant(inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime))) {
            table.rollbackInflightCompaction(inflightInstant, commitToRollback -> this.getPendingRollbackInfo(table.getMetaClient(), (String)commitToRollback, false));
            table.getMetaClient().reloadActiveTimeline();
        }
        this.compactionTimer = this.metrics.getCompactionCtx();
        HoodieWriteMetadata writeMetadata = table.compact(this.context, compactionInstantTime);
        HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD((HoodieData)writeMetadata.getWriteStatuses()));
        if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
            this.completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), table, compactionInstantTime);
        }
        return compactionMetadata;
    }

    @Override
    protected HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompact(String logCompactionInstantTime, boolean shouldComplete) {
        HoodieInstant inflightInstant;
        HoodieSparkTable table = HoodieSparkTable.create(this.config, this.context);
        HoodieTimeline pendingLogCompactionTimeline = table.getActiveTimeline().filterPendingLogCompactionTimeline();
        if (pendingLogCompactionTimeline.containsInstant(inflightInstant = HoodieTimeline.getLogCompactionInflightInstant(logCompactionInstantTime))) {
            LOG.info("Found Log compaction inflight file. Rolling back the commit and exiting.");
            table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> this.getPendingRollbackInfo(table.getMetaClient(), (String)commitToRollback, false));
            table.getMetaClient().reloadActiveTimeline();
            throw new HoodieException("Inflight logcompaction file exists");
        }
        this.logCompactionTimer = this.metrics.getLogCompactionCtx();
        WriteMarkersFactory.get(this.config.getMarkersType(), table, logCompactionInstantTime);
        HoodieWriteMetadata writeMetadata = table.logCompact(this.context, logCompactionInstantTime);
        HoodieWriteMetadata<JavaRDD<WriteStatus>> logCompactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD((HoodieData)writeMetadata.getWriteStatuses()));
        if (shouldComplete && logCompactionMetadata.getCommitMetadata().isPresent()) {
            this.completeTableService(TableServiceType.LOG_COMPACT, logCompactionMetadata.getCommitMetadata().get(), table, logCompactionInstantTime);
        }
        return logCompactionMetadata;
    }

    @Override
    public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
        HoodieSparkTable table = HoodieSparkTable.create(this.config, this.context);
        extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
        this.completeCompaction(metadata, table, compactionInstantTime);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) {
        this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + this.config.getTableName());
        List<HoodieWriteStat> writeStats = metadata.getWriteStats();
        HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
        try {
            this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
            this.finalizeWrite(table, compactionCommitTime, writeStats);
            this.updateTableMetadata(table, metadata, compactionInstant);
            LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
            CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
        }
        finally {
            this.txnManager.endTransaction(Option.of(compactionInstant));
        }
        WriteMarkersFactory.get(this.config.getMarkersType(), table, compactionCommitTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
        if (this.compactionTimer != null) {
            long durationInMs = this.metrics.getDurationInMs(this.compactionTimer.stop());
            HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant -> this.metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, "compaction"));
        }
        LOG.info("Compacted successfully on commit " + compactionCommitTime);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable table, String logCompactionCommitTime) {
        this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log compaction write status and commit compaction");
        List<HoodieWriteStat> writeStats = metadata.getWriteStats();
        HoodieInstant logCompactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, "logcompaction", logCompactionCommitTime);
        try {
            this.txnManager.beginTransaction(Option.of(logCompactionInstant), Option.empty());
            this.preCommit(metadata);
            this.finalizeWrite(table, logCompactionCommitTime, writeStats);
            this.updateTableMetadata(table, metadata, logCompactionInstant);
            LOG.info("Committing Log Compaction " + logCompactionCommitTime + ". Finished with result " + metadata);
            CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata);
        }
        finally {
            this.txnManager.endTransaction(Option.of(logCompactionInstant));
        }
        WriteMarkersFactory.get(this.config.getMarkersType(), table, logCompactionCommitTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
        if (this.compactionTimer != null) {
            long durationInMs = this.metrics.getDurationInMs(this.compactionTimer.stop());
            HoodieActiveTimeline.parseDateFromInstantTimeSafely(logCompactionCommitTime).ifPresent(parsedInstant -> this.metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, "logcompaction"));
        }
        LOG.info("Log Compacted successfully on commit " + logCompactionCommitTime);
    }

    @Override
    public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
        HoodieInstant inflightInstant;
        HoodieSparkTable table = HoodieSparkTable.create(this.config, this.context);
        HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
        if (pendingClusteringTimeline.containsInstant(inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant))) {
            table.rollbackInflightClustering(inflightInstant, commitToRollback -> this.getPendingRollbackInfo(table.getMetaClient(), (String)commitToRollback, false));
            table.getMetaClient().reloadActiveTimeline();
        }
        this.clusteringTimer = this.metrics.getClusteringCtx();
        LOG.info("Starting clustering at " + clusteringInstant);
        HoodieWriteMetadata writeMetadata = table.cluster(this.context, clusteringInstant);
        HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD((HoodieData)writeMetadata.getWriteStatuses()));
        this.validateClusteringCommit(clusteringMetadata, clusteringInstant, table);
        if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
            this.completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
        }
        return clusteringMetadata;
    }

    private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, HoodieTable table, String commitInstant) {
        switch (tableServiceType) {
            case CLUSTER: {
                this.completeClustering((HoodieReplaceCommitMetadata)metadata, table, commitInstant);
                break;
            }
            case COMPACT: {
                this.completeCompaction(metadata, table, commitInstant);
                break;
            }
            case LOG_COMPACT: {
                this.completeLogCompaction(metadata, table, commitInstant);
                break;
            }
            default: {
                throw new IllegalArgumentException("This table service is not valid " + (Object)((Object)tableServiceType));
            }
        }
    }

    private void completeClustering(HoodieReplaceCommitMetadata metadata, HoodieTable table, String clusteringCommitTime) {
        List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> ((List)e.getValue()).stream()).collect(Collectors.toList());
        if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0L) {
            throw new HoodieClusteringException("Clustering failed to write to files:" + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
        }
        HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
        try {
            this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
            this.finalizeWrite(table, clusteringCommitTime, writeStats);
            this.updateTableMetadata(table, metadata, clusteringInstant);
            LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);
            table.getActiveTimeline().transitionReplaceInflightToComplete(clusteringInstant, Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
        }
        catch (Exception e2) {
            throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e2);
        }
        finally {
            this.txnManager.endTransaction(Option.of(clusteringInstant));
        }
        WriteMarkersFactory.get(this.config.getMarkersType(), table, clusteringCommitTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
        if (this.clusteringTimer != null) {
            long durationInMs = this.metrics.getDurationInMs(this.clusteringTimer.stop());
            HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant -> this.metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, "replacecommit"));
        }
        LOG.info("Clustering successfully on commit " + clusteringCommitTime);
    }

    private void validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata, String clusteringCommitTime, HoodieTable table) {
        if (clusteringMetadata.getWriteStatuses().isEmpty()) {
            HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringCommitTime)).map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException("Unable to read clustering plan for instant: " + clusteringCommitTime));
            throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + clusteringCommitTime + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least " + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum() + " write statuses");
        }
    }

    private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata, HoodieInstant hoodieInstant) {
        boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction(), hoodieInstant.getTimestamp());
        table.getMetadataWriter(hoodieInstant.getTimestamp()).ifPresent(writer -> ((HoodieTableMetadataWriter)writer).update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
    }

    protected void initializeMetadataTable(Option<String> inFlightInstantTimestamp) {
        if (this.config.isMetadataTableEnabled()) {
            SparkHoodieBackedTableMetadataWriter.create(this.context.getHadoopConf().get(), this.config, this.context, Option.empty(), inFlightInstantTimestamp);
        }
    }

    @Override
    protected HoodieTable<?, ?, ?, ?> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
        return HoodieSparkTable.create(config, this.context);
    }

    @Override
    protected void preCommit(HoodieCommitMetadata metadata) {
        HoodieTable<?, ?, ?, ?> table = this.createTable(this.config, this.hadoopConf);
        this.resolveWriteConflict(table, metadata, this.pendingInflightAndRequestedInstants);
    }
}

