/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkValidatorUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieConcatHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.HoodieMergeHelper;
import org.apache.hudi.table.action.commit.SparkHoodiePartitioner;
import org.apache.hudi.table.action.commit.UpsertPartitioner;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;

public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayload>
extends BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> {
    private static final Logger LOG = LogManager.getLogger(BaseSparkCommitActionExecutor.class);
    protected final Option<BaseKeyGenerator> keyGeneratorOpt;

    public BaseSparkCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType) {
        this(context, config, table, instantTime, operationType, Option.empty());
    }

    public BaseSparkCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType, Option<Map<String, String>> extraMetadata) {
        super(context, config, table, instantTime, operationType, extraMetadata);
        try {
            this.keyGeneratorOpt = config.populateMetaFields() ? Option.empty() : Option.of((BaseKeyGenerator)HoodieSparkKeyGeneratorFactory.createKeyGenerator(this.config.getProps()));
        }
        catch (IOException e) {
            throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e);
        }
    }

    private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords, Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
        this.context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering: " + this.config.getTableName());
        UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils.loadClass(this.config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
        Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups = updateStrategy.handleUpdate(inputRecords);
        Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight();
        if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
            return recordsAndPendingClusteringFileGroups.getLeft();
        }
        if (this.config.isRollbackPendingClustering()) {
            Set<HoodieInstant> pendingClusteringInstantsToRollback = ClusteringUtils.getAllFileGroupsInPendingClusteringPlans(this.table.getMetaClient()).entrySet().stream().filter(e -> fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey())).map(Map.Entry::getValue).collect(Collectors.toSet());
            pendingClusteringInstantsToRollback.forEach(instant -> {
                String commitTime = HoodieActiveTimeline.createNewInstantTime();
                this.table.scheduleRollback(this.context, commitTime, (HoodieInstant)instant, false, this.config.shouldRollbackUsingMarkers());
                this.table.rollback(this.context, commitTime, (HoodieInstant)instant, true, true);
            });
            this.table.getMetaClient().reloadActiveTimeline();
        }
        return recordsAndPendingClusteringFileGroups.getLeft();
    }

    @Override
    public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
        Set<HoodieFileGroupId> fileGroupsInPendingClustering;
        JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
        if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
            inputRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
        } else {
            LOG.info((Object)("RDD PreppedRecords was persisted at: " + inputRDD.getStorageLevel()));
        }
        WorkloadProfile workloadProfile = null;
        if (this.isWorkloadProfileNeeded()) {
            this.context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile: " + this.config.getTableName());
            workloadProfile = new WorkloadProfile(this.buildProfile(inputRecords), this.operationType, this.table.getIndex().canIndexLogFiles());
            LOG.info((Object)("Input workload profile :" + workloadProfile));
        }
        Partitioner partitioner = this.getPartitioner(workloadProfile);
        if (this.isWorkloadProfileNeeded()) {
            this.saveWorkloadProfileMetadataToInflight(workloadProfile, this.instantTime);
        }
        HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = (fileGroupsInPendingClustering = this.table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet())).isEmpty() ? inputRecords : this.clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering);
        this.context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + this.config.getTableName());
        HoodieData<WriteStatus> writeStatuses = this.mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
        HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<HoodieData<WriteStatus>>();
        this.updateIndexAndCommitIfNeeded(writeStatuses, result);
        return result;
    }

    private Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(HoodieData<HoodieRecord<T>> inputRecords) {
        HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<String, WorkloadStat>();
        WorkloadStat globalStat = new WorkloadStat();
        Map partitionLocationCounts = inputRecords.mapToPair(record -> Pair.of(new Tuple2((Object)record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record)).countByKey();
        for (Map.Entry e : partitionLocationCounts.entrySet()) {
            String partitionPath = (String)((Tuple2)e.getKey())._1();
            Long count2 = e.getValue();
            Option locOption = (Option)((Tuple2)e.getKey())._2();
            if (!partitionPathStatMap.containsKey(partitionPath)) {
                partitionPathStatMap.put(partitionPath, new WorkloadStat());
            }
            if (locOption.isPresent()) {
                ((WorkloadStat)partitionPathStatMap.get(partitionPath)).addUpdates((HoodieRecordLocation)locOption.get(), count2);
                globalStat.addUpdates((HoodieRecordLocation)locOption.get(), count2);
                continue;
            }
            ((WorkloadStat)partitionPathStatMap.get(partitionPath)).addInserts(count2);
            globalStat.addInserts(count2);
        }
        return Pair.of(partitionPathStatMap, globalStat);
    }

    protected Partitioner getPartitioner(WorkloadProfile profile) {
        Option<String> layoutPartitionerClass = this.table.getStorageLayout().layoutPartitionerClass();
        if (layoutPartitionerClass.isPresent()) {
            return this.getLayoutPartitioner(profile, layoutPartitionerClass.get());
        }
        if (WriteOperationType.isChangingRecords(this.operationType)) {
            return this.getUpsertPartitioner(profile);
        }
        return this.getInsertPartitioner(profile);
    }

    private HoodieData<WriteStatus> mapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
        JavaPairRDD partitionedRDD;
        JavaPairRDD mappedRDD = HoodieJavaPairRDD.getJavaPairRDD(dedupedRecords.mapToPair(record -> Pair.of(new Tuple2((Object)record.getKey(), Option.ofNullable(record.getCurrentLocation())), record)));
        if (this.table.requireSortedRecords()) {
            Comparator comparator = (Comparator & Serializable)(t1, t2) -> {
                HoodieKey key1 = (HoodieKey)t1._1;
                HoodieKey key2 = (HoodieKey)t2._1;
                return key1.getRecordKey().compareTo(key2.getRecordKey());
            };
            partitionedRDD = mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
        } else {
            partitionedRDD = mappedRDD.partitionBy(partitioner);
        }
        return HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((Function2 & Serializable)(partition, recordItr) -> {
            if (WriteOperationType.isChangingRecords(this.operationType)) {
                return this.handleUpsertPartition(this.instantTime, (Integer)partition, (Iterator)recordItr, partitioner);
            }
            return this.handleInsertPartition(this.instantTime, (Integer)partition, (Iterator)recordItr, partitioner);
        }, true).flatMap(List::iterator));
    }

    protected HoodieData<WriteStatus> updateIndex(HoodieData<WriteStatus> writeStatuses, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
        writeStatuses.persist(this.config.getString(HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE));
        Instant indexStartTime = Instant.now();
        HoodieData<WriteStatus> statuses = this.table.getIndex().updateLocation(writeStatuses, this.context, this.table);
        result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
        result.setWriteStatuses(statuses);
        return statuses;
    }

    protected void updateIndexAndCommitIfNeeded(HoodieData<WriteStatus> writeStatusRDD, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
        this.updateIndex(writeStatusRDD, result);
        result.setPartitionToReplaceFileIds(this.getPartitionToReplacedFileIds(result));
        this.commitOnAutoCommit(result);
    }

    @Override
    protected String getCommitActionType() {
        return this.table.getMetaClient().getCommitActionType();
    }

    @Override
    protected void setCommitMetadata(HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
        result.setCommitMetadata(Option.of(CommitUtils.buildMetadata(result.getWriteStatuses().map(WriteStatus::getStat).collectAsList(), result.getPartitionToReplaceFileIds(), this.extraMetadata, this.operationType, this.getSchemaToStoreInCommit(), this.getCommitActionType())));
    }

    @Override
    protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
        this.context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect: " + this.config.getTableName());
        this.commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collectAsList());
    }

    protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
        String actionType = this.getCommitActionType();
        LOG.info((Object)("Committing " + this.instantTime + ", action Type " + actionType + ", operation Type " + (Object)((Object)this.operationType)));
        result.setCommitted(true);
        result.setWriteStats(writeStats);
        this.finalizeWrite(this.instantTime, writeStats, result);
        try {
            HoodieActiveTimeline activeTimeline = this.table.getActiveTimeline();
            HoodieCommitMetadata metadata = result.getCommitMetadata().get();
            this.writeTableMetadata(metadata, actionType);
            activeTimeline.saveAsComplete(new HoodieInstant(true, this.getCommitActionType(), this.instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
            LOG.info((Object)("Committed " + this.instantTime));
            result.setCommitMetadata(Option.of(metadata));
        }
        catch (IOException e) {
            throw new HoodieCommitException("Failed to complete commit " + this.config.getBasePath() + " at time " + this.instantTime, e);
        }
    }

    protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>> writeStatuses) {
        return Collections.emptyMap();
    }

    protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
        SparkHoodiePartitioner upsertPartitioner = (SparkHoodiePartitioner)partitioner;
        BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
        BucketType btype = binfo.bucketType;
        try {
            if (btype.equals((Object)BucketType.INSERT)) {
                return this.handleInsert(binfo.fileIdPrefix, recordItr);
            }
            if (btype.equals((Object)BucketType.UPDATE)) {
                return this.handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
            }
            throw new HoodieUpsertException("Unknown bucketType " + (Object)((Object)btype) + " for partition :" + partition);
        }
        catch (Throwable t) {
            String msg = "Error upserting bucketType " + (Object)((Object)btype) + " for partition :" + partition;
            LOG.error((Object)msg, t);
            throw new HoodieUpsertException(msg, t);
        }
    }

    protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
        return this.handleUpsertPartition(instantTime, partition, recordItr, partitioner);
    }

    @Override
    public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) throws IOException {
        if (!recordItr.hasNext()) {
            LOG.info((Object)("Empty partition with fileId => " + fileId));
            return Collections.emptyIterator();
        }
        HoodieMergeHandle upsertHandle = this.getUpdateHandle(partitionPath, fileId, recordItr);
        return this.handleUpdateInternal(upsertHandle, fileId);
    }

    protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId) throws IOException {
        if (upsertHandle.getOldFilePath() == null) {
            throw new HoodieUpsertException("Error in finding the old file path at commit " + this.instantTime + " for fileId: " + fileId);
        }
        HoodieMergeHelper.newInstance().runMerge(this.table, upsertHandle);
        if (upsertHandle.getPartitionPath() == null) {
            LOG.info((Object)("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " + upsertHandle.writeStatuses()));
        }
        return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
    }

    protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
        if (this.table.requireSortedRecords()) {
            return new HoodieSortedMergeHandle(this.config, this.instantTime, (HoodieSparkTable)this.table, recordItr, partitionPath, fileId, this.taskContextSupplier, this.keyGeneratorOpt);
        }
        if (!WriteOperationType.isChangingRecords(this.operationType) && this.config.allowDuplicateInserts()) {
            return new HoodieConcatHandle(this.config, this.instantTime, this.table, recordItr, partitionPath, fileId, this.taskContextSupplier, this.keyGeneratorOpt);
        }
        return new HoodieMergeHandle(this.config, this.instantTime, this.table, recordItr, partitionPath, fileId, this.taskContextSupplier, this.keyGeneratorOpt);
    }

    @Override
    public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) throws Exception {
        if (!recordItr.hasNext()) {
            LOG.info((Object)"Empty partition");
            return Collections.emptyIterator();
        }
        return new SparkLazyInsertIterable<T>(recordItr, true, this.config, this.instantTime, this.table, idPfx, this.taskContextSupplier, new CreateHandleFactory());
    }

    public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
        if (profile == null) {
            throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
        }
        return new UpsertPartitioner(profile, this.context, this.table, this.config);
    }

    public Partitioner getInsertPartitioner(WorkloadProfile profile) {
        return this.getUpsertPartitioner(profile);
    }

    public Partitioner getLayoutPartitioner(WorkloadProfile profile, String layoutPartitionerClass) {
        return (Partitioner)ReflectionUtils.loadClass(layoutPartitionerClass, new Class[]{WorkloadProfile.class, HoodieEngineContext.class, HoodieTable.class, HoodieWriteConfig.class}, new Object[]{profile, this.context, this.table, this.config});
    }

    @Override
    protected void runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
        SparkValidatorUtils.runValidators(this.config, writeMetadata, this.context, this.table, this.instantTime);
    }
}

