/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.client.utils.SparkValidatorUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieConcatHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.SparkMergeHelper;
import org.apache.hudi.table.action.commit.UpsertPartitioner;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;

public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayload>
extends BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, HoodieWriteMetadata> {
    private static final Logger LOG = LogManager.getLogger(BaseSparkCommitActionExecutor.class);
    protected Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();

    public BaseSparkCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType) {
        super(context, config, table, instantTime, operationType, Option.empty());
        this.initKeyGenIfNeeded(config.populateMetaFields());
    }

    public BaseSparkCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType, Option extraMetadata) {
        super(context, config, table, instantTime, operationType, extraMetadata);
        this.initKeyGenIfNeeded(config.populateMetaFields());
    }

    private void initKeyGenIfNeeded(boolean populateMetaFields) {
        if (!populateMetaFields) {
            try {
                this.keyGeneratorOpt = Option.of((BaseKeyGenerator)HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(this.config.getProps())));
            }
            catch (IOException e) {
                throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e);
            }
        }
    }

    private JavaRDD<HoodieRecord<T>> clusteringHandleUpdate(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
        Set fileGroupsInPendingClustering = this.table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> (HoodieFileGroupId)entry.getKey()).collect(Collectors.toSet());
        UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils.loadClass(this.config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
        Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups = updateStrategy.handleUpdate(inputRecordsRDD);
        Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight();
        if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
            return recordsAndPendingClusteringFileGroups.getLeft();
        }
        if (this.config.isRollbackPendingClustering()) {
            Set<HoodieInstant> pendingClusteringInstantsToRollback = ClusteringUtils.getAllFileGroupsInPendingClusteringPlans(this.table.getMetaClient()).entrySet().stream().filter(e -> fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey())).map(Map.Entry::getValue).collect(Collectors.toSet());
            pendingClusteringInstantsToRollback.forEach(instant -> {
                String commitTime = HoodieActiveTimeline.createNewInstantTime();
                this.table.scheduleRollback(this.context, commitTime, (HoodieInstant)instant, false, this.config.shouldRollbackUsingMarkers());
                this.table.rollback(this.context, commitTime, (HoodieInstant)instant, true, true);
            });
            this.table.getMetaClient().reloadActiveTimeline();
        }
        return recordsAndPendingClusteringFileGroups.getLeft();
    }

    @Override
    public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
        HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new HoodieWriteMetadata<JavaRDD<WriteStatus>>();
        if (inputRecordsRDD.getStorageLevel() == StorageLevel.NONE()) {
            inputRecordsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
        } else {
            LOG.info((Object)("RDD PreppedRecords was persisted at: " + inputRecordsRDD.getStorageLevel()));
        }
        WorkloadProfile profile = null;
        if (this.isWorkloadProfileNeeded()) {
            this.context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile");
            profile = new WorkloadProfile(this.buildProfile(inputRecordsRDD), this.operationType);
            LOG.info((Object)("Workload profile :" + profile));
            this.saveWorkloadProfileMetadataToInflight(profile, this.instantTime);
        }
        JavaRDD<HoodieRecord<T>> inputRecordsRDDWithClusteringUpdate = this.clusteringHandleUpdate(inputRecordsRDD);
        Partitioner partitioner = this.getPartitioner(profile);
        JavaRDD<HoodieRecord<T>> partitionedRecords = this.partition(inputRecordsRDDWithClusteringUpdate, partitioner);
        JavaRDD writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((Function2 & Serializable)(partition, recordItr) -> {
            if (WriteOperationType.isChangingRecords(this.operationType)) {
                return this.handleUpsertPartition(this.instantTime, (Integer)partition, (Iterator)recordItr, partitioner);
            }
            return this.handleInsertPartition(this.instantTime, (Integer)partition, (Iterator)recordItr, partitioner);
        }, true).flatMap(List::iterator);
        this.updateIndexAndCommitIfNeeded((JavaRDD<WriteStatus>)writeStatusRDD, result);
        return result;
    }

    private Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
        HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<String, WorkloadStat>();
        WorkloadStat globalStat = new WorkloadStat();
        Map partitionLocationCounts = inputRecordsRDD.mapToPair((PairFunction & Serializable)record -> new Tuple2((Object)new Tuple2((Object)record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record)).countByKey();
        for (Map.Entry e : partitionLocationCounts.entrySet()) {
            String partitionPath = (String)((Tuple2)e.getKey())._1();
            Long count = (Long)e.getValue();
            Option locOption = (Option)((Tuple2)e.getKey())._2();
            if (!partitionPathStatMap.containsKey(partitionPath)) {
                partitionPathStatMap.put(partitionPath, new WorkloadStat());
            }
            if (locOption.isPresent()) {
                ((WorkloadStat)partitionPathStatMap.get(partitionPath)).addUpdates((HoodieRecordLocation)locOption.get(), count);
                globalStat.addUpdates((HoodieRecordLocation)locOption.get(), count);
                continue;
            }
            ((WorkloadStat)partitionPathStatMap.get(partitionPath)).addInserts(count);
            globalStat.addInserts(count);
        }
        return Pair.of(partitionPathStatMap, globalStat);
    }

    protected Partitioner getPartitioner(WorkloadProfile profile) {
        if (WriteOperationType.isChangingRecords(this.operationType)) {
            return this.getUpsertPartitioner(profile);
        }
        return this.getInsertPartitioner(profile);
    }

    private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
        JavaPairRDD partitionedRDD;
        JavaPairRDD mappedRDD = dedupedRecords.mapToPair((PairFunction & Serializable)record -> new Tuple2((Object)new Tuple2((Object)record.getKey(), Option.ofNullable(record.getCurrentLocation())), record));
        if (this.table.requireSortedRecords()) {
            Comparator comparator = (Comparator & Serializable)(t1, t2) -> {
                HoodieKey key1 = (HoodieKey)t1._1;
                HoodieKey key2 = (HoodieKey)t2._1;
                return key1.getRecordKey().compareTo(key2.getRecordKey());
            };
            partitionedRDD = mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
        } else {
            partitionedRDD = mappedRDD.partitionBy(partitioner);
        }
        return partitionedRDD.map(Tuple2::_2);
    }

    protected JavaRDD<WriteStatus> updateIndex(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
        writeStatusRDD = writeStatusRDD.persist(SparkMemoryUtils.getWriteStatusStorageLevel(this.config.getProps()));
        Instant indexStartTime = Instant.now();
        JavaRDD<WriteStatus> statuses = HoodieJavaRDD.getJavaRDD(this.table.getIndex().updateLocation((HoodieData<WriteStatus>)HoodieJavaRDD.of(writeStatusRDD), this.context, this.table));
        result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
        result.setWriteStatuses(statuses);
        return statuses;
    }

    protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
        this.updateIndex(writeStatusRDD, result);
        result.setPartitionToReplaceFileIds(this.getPartitionToReplacedFileIds(result));
        this.commitOnAutoCommit(result);
    }

    @Override
    protected String getCommitActionType() {
        return this.table.getMetaClient().getCommitActionType();
    }

    @Override
    protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
        this.context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect");
        this.commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collect());
    }

    protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
        String actionType = this.getCommitActionType();
        LOG.info((Object)("Committing " + this.instantTime + ", action Type " + actionType));
        result.setCommitted(true);
        result.setWriteStats(writeStats);
        this.finalizeWrite(this.instantTime, writeStats, result);
        try {
            HoodieActiveTimeline activeTimeline = this.table.getActiveTimeline();
            HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), extraMetadata, this.operationType, this.getSchemaToStoreInCommit(), this.getCommitActionType());
            this.writeTableMetadata(metadata, actionType);
            activeTimeline.saveAsComplete(new HoodieInstant(true, this.getCommitActionType(), this.instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
            LOG.info((Object)("Committed " + this.instantTime));
            result.setCommitMetadata(Option.of(metadata));
        }
        catch (IOException e) {
            throw new HoodieCommitException("Failed to complete commit " + this.config.getBasePath() + " at time " + this.instantTime, e);
        }
    }

    protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeStatuses) {
        return Collections.emptyMap();
    }

    protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
        UpsertPartitioner upsertPartitioner = (UpsertPartitioner)partitioner;
        BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
        BucketType btype = binfo.bucketType;
        try {
            if (btype.equals((Object)BucketType.INSERT)) {
                return this.handleInsert(binfo.fileIdPrefix, recordItr);
            }
            if (btype.equals((Object)BucketType.UPDATE)) {
                return this.handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
            }
            throw new HoodieUpsertException("Unknown bucketType " + (Object)((Object)btype) + " for partition :" + partition);
        }
        catch (Throwable t) {
            String msg = "Error upserting bucketType " + (Object)((Object)btype) + " for partition :" + partition;
            LOG.error((Object)msg, t);
            throw new HoodieUpsertException(msg, t);
        }
    }

    protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
        return this.handleUpsertPartition(instantTime, partition, recordItr, partitioner);
    }

    @Override
    public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) throws IOException {
        if (!recordItr.hasNext()) {
            LOG.info((Object)("Empty partition with fileId => " + fileId));
            return Collections.singletonList(Collections.EMPTY_LIST).iterator();
        }
        HoodieMergeHandle upsertHandle = this.getUpdateHandle(partitionPath, fileId, recordItr);
        return this.handleUpdateInternal(upsertHandle, fileId);
    }

    protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId) throws IOException {
        if (upsertHandle.getOldFilePath() == null) {
            throw new HoodieUpsertException("Error in finding the old file path at commit " + this.instantTime + " for fileId: " + fileId);
        }
        SparkMergeHelper.newInstance().runMerge(this.table, upsertHandle);
        if (upsertHandle.getPartitionPath() == null) {
            LOG.info((Object)("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " + upsertHandle.writeStatuses()));
        }
        return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
    }

    protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
        if (this.table.requireSortedRecords()) {
            return new HoodieSortedMergeHandle(this.config, this.instantTime, (HoodieSparkTable)this.table, recordItr, partitionPath, fileId, this.taskContextSupplier, this.keyGeneratorOpt);
        }
        if (!WriteOperationType.isChangingRecords(this.operationType) && this.config.allowDuplicateInserts()) {
            return new HoodieConcatHandle(this.config, this.instantTime, this.table, recordItr, partitionPath, fileId, this.taskContextSupplier, this.keyGeneratorOpt);
        }
        return new HoodieMergeHandle(this.config, this.instantTime, this.table, recordItr, partitionPath, fileId, this.taskContextSupplier, this.keyGeneratorOpt);
    }

    @Override
    public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) throws Exception {
        if (!recordItr.hasNext()) {
            LOG.info((Object)"Empty partition");
            return Collections.singletonList(Collections.EMPTY_LIST).iterator();
        }
        return new SparkLazyInsertIterable<T>(recordItr, true, this.config, this.instantTime, this.table, idPfx, this.taskContextSupplier, new CreateHandleFactory());
    }

    public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
        if (profile == null) {
            throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
        }
        return new UpsertPartitioner(profile, this.context, this.table, this.config);
    }

    public Partitioner getInsertPartitioner(WorkloadProfile profile) {
        return this.getUpsertPartitioner(profile);
    }

    @Override
    protected void runPrecommitValidators(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
        SparkValidatorUtils.runValidators(this.config, writeMetadata, this.context, this.table, this.instantTime);
    }
}

