/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy;
import org.apache.hudi.client.utils.SparkValidatorUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieMergeHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.SparkHoodiePartitioner;
import org.apache.hudi.table.action.commit.UpsertPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public abstract class BaseSparkCommitActionExecutor<T>
extends BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseSparkCommitActionExecutor.class);
    protected final Option<BaseKeyGenerator> keyGeneratorOpt;

    public BaseSparkCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType) {
        this(context, config, table, instantTime, operationType, (Option<Map<String, String>>)Option.empty());
    }

    public BaseSparkCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType, Option<Map<String, String>> extraMetadata) {
        super(context, config, table, instantTime, operationType, extraMetadata);
        this.keyGeneratorOpt = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config);
    }

    private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
        this.context.setJobStatus(((Object)((Object)this)).getClass().getSimpleName(), "Handling updates which are under clustering: " + this.config.getTableName());
        Set fileGroupsInPendingClustering = this.table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
        if (fileGroupsInPendingClustering.isEmpty()) {
            return inputRecords;
        }
        UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils.loadClass((String)this.config.getClusteringUpdatesStrategyClass(), (Class[])new Class[]{HoodieEngineContext.class, HoodieTable.class, Set.class}, (Object[])new Object[]{this.context, this.table, fileGroupsInPendingClustering});
        if (updateStrategy instanceof SparkAllowUpdateStrategy && !this.config.isRollbackPendingClustering()) {
            return inputRecords;
        }
        Pair recordsAndPendingClusteringFileGroups = updateStrategy.handleUpdate(inputRecords);
        Set fileGroupsWithUpdatesAndPendingClustering = (Set)recordsAndPendingClusteringFileGroups.getRight();
        if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
            return (HoodieData)recordsAndPendingClusteringFileGroups.getLeft();
        }
        if (this.config.isRollbackPendingClustering()) {
            Set<HoodieInstant> pendingClusteringInstantsToRollback = ClusteringUtils.getAllFileGroupsInPendingClusteringPlans((HoodieTableMetaClient)this.table.getMetaClient()).entrySet().stream().filter(e -> fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey())).map(Map.Entry::getValue).collect(Collectors.toSet());
            pendingClusteringInstantsToRollback.forEach(instant -> {
                String commitTime = this.table.getMetaClient().createNewInstantTime();
                this.table.scheduleRollback(this.context, commitTime, instant, false, this.config.shouldRollbackUsingMarkers(), false);
                this.table.rollback(this.context, commitTime, instant, true, true);
            });
            this.table.getMetaClient().reloadActiveTimeline();
        }
        return (HoodieData)recordsAndPendingClusteringFileGroups.getLeft();
    }

    public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
        JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
        if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
            HoodieJavaRDD.of(inputRDD).persist(this.config.getTaggedRecordStorageLevel(), this.context, HoodieData.HoodieDataCacheKey.of((String)this.config.getBasePath(), (String)this.instantTime));
        } else {
            LOG.info("RDD PreppedRecords was persisted at: " + inputRDD.getStorageLevel());
        }
        HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = this.clusteringHandleUpdate(inputRecords);
        this.context.setJobStatus(((Object)((Object)this)).getClass().getSimpleName(), "Building workload profile:" + this.config.getTableName());
        HoodieTimer sourceReadAndIndexTimer = HoodieTimer.start();
        WorkloadProfile workloadProfile = new WorkloadProfile(this.buildProfile(inputRecordsWithClusteringUpdate), this.operationType, this.table.getIndex().canIndexLogFiles());
        LOG.debug("Input workload profile :" + workloadProfile);
        long sourceReadAndIndexDurationMs = sourceReadAndIndexTimer.endTimer();
        LOG.info("Source read and index timer " + sourceReadAndIndexDurationMs);
        Partitioner partitioner = this.getPartitioner(workloadProfile);
        this.saveWorkloadProfileMetadataToInflight(workloadProfile, this.instantTime);
        this.context.setJobStatus(((Object)((Object)this)).getClass().getSimpleName(), "Doing partition and writing data: " + this.config.getTableName());
        HoodieData<WriteStatus> writeStatuses = this.mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
        HoodieWriteMetadata result = new HoodieWriteMetadata();
        this.updateIndexAndCommitIfNeeded(writeStatuses, (HoodieWriteMetadata<HoodieData<WriteStatus>>)result);
        result.setSourceReadAndIndexDurationMs(Long.valueOf(sourceReadAndIndexDurationMs));
        return result;
    }

    private Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(HoodieData<HoodieRecord<T>> inputRecords) {
        HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<String, WorkloadStat>();
        WorkloadStat globalStat = new WorkloadStat();
        Map partitionLocationCounts = inputRecords.mapToPair((SerializablePairFunction & Serializable)record -> Pair.of((Object)new Tuple2((Object)record.getPartitionPath(), (Object)Option.ofNullable((Object)record.getCurrentLocation())), (Object)record)).countByKey();
        for (Map.Entry e : partitionLocationCounts.entrySet()) {
            String partitionPath = (String)((Tuple2)e.getKey())._1();
            Long count = (Long)e.getValue();
            Option locOption = (Option)((Tuple2)e.getKey())._2();
            if (!partitionPathStatMap.containsKey(partitionPath)) {
                partitionPathStatMap.put(partitionPath, new WorkloadStat());
            }
            if (locOption.isPresent()) {
                ((WorkloadStat)partitionPathStatMap.get(partitionPath)).addUpdates((HoodieRecordLocation)locOption.get(), count.longValue());
                globalStat.addUpdates((HoodieRecordLocation)locOption.get(), count.longValue());
                continue;
            }
            ((WorkloadStat)partitionPathStatMap.get(partitionPath)).addInserts(count.longValue());
            globalStat.addInserts(count.longValue());
        }
        return Pair.of(partitionPathStatMap, (Object)globalStat);
    }

    protected Partitioner getPartitioner(WorkloadProfile profile) {
        Option layoutPartitionerClass = this.table.getStorageLayout().layoutPartitionerClass();
        if (layoutPartitionerClass.isPresent()) {
            return this.getLayoutPartitioner(profile, (String)layoutPartitionerClass.get());
        }
        if (WriteOperationType.isChangingRecords((WriteOperationType)this.operationType)) {
            return this.getUpsertPartitioner(profile);
        }
        return this.getInsertPartitioner(profile);
    }

    private HoodieData<WriteStatus> mapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
        JavaPairRDD partitionedRDD;
        JavaPairRDD mappedRDD = HoodieJavaPairRDD.getJavaPairRDD(dedupedRecords.mapToPair((SerializablePairFunction & Serializable)record -> Pair.of((Object)new Tuple2((Object)record.getKey(), (Object)Option.ofNullable((Object)record.getCurrentLocation())), (Object)record)));
        if (this.table.requireSortedRecords()) {
            Comparator comparator = (Comparator & Serializable)(t1, t2) -> {
                HoodieKey key1 = (HoodieKey)t1._1;
                HoodieKey key2 = (HoodieKey)t2._1;
                return key1.getRecordKey().compareTo(key2.getRecordKey());
            };
            partitionedRDD = mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
        } else {
            partitionedRDD = mappedRDD.partitionBy(partitioner);
        }
        return HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((Function2 & Serializable)(partition, recordItr) -> {
            if (WriteOperationType.isChangingRecords((WriteOperationType)this.operationType)) {
                return this.handleUpsertPartition(this.instantTime, (Integer)partition, (Iterator)recordItr, partitioner);
            }
            return this.handleInsertPartition(this.instantTime, (Integer)partition, (Iterator)recordItr, partitioner);
        }, true).flatMap(List::iterator));
    }

    protected HoodieData<WriteStatus> updateIndex(HoodieData<WriteStatus> writeStatuses, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
        writeStatuses.persist(this.config.getString(HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE), this.context, HoodieData.HoodieDataCacheKey.of((String)this.config.getBasePath(), (String)this.instantTime));
        Instant indexStartTime = Instant.now();
        HoodieData statuses = this.table.getIndex().updateLocation(writeStatuses, this.context, this.table, this.instantTime);
        result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
        result.setWriteStatuses((Object)statuses);
        return statuses;
    }

    protected void updateIndexAndCommitIfNeeded(HoodieData<WriteStatus> writeStatusRDD, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
        this.updateIndex(writeStatusRDD, result);
        result.setPartitionToReplaceFileIds(this.getPartitionToReplacedFileIds(result));
        this.commitOnAutoCommit(result);
    }

    protected String getCommitActionType() {
        return this.table.getMetaClient().getCommitActionType();
    }

    protected void setCommitMetadata(HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
        List writeStats = ((HoodieData)result.getWriteStatuses()).map(WriteStatus::getStat).collectAsList();
        result.setWriteStats(writeStats);
        result.setCommitMetadata(Option.of((Object)CommitUtils.buildMetadata((List)writeStats, (Map)result.getPartitionToReplaceFileIds(), (Option)this.extraMetadata, (WriteOperationType)this.operationType, (String)this.getSchemaToStoreInCommit(), (String)this.getCommitActionType())));
    }

    protected void commit(HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
        this.context.setJobStatus(((Object)((Object)this)).getClass().getSimpleName(), "Commit write status collect: " + this.config.getTableName());
        this.commit(result, result.getWriteStats().isPresent() ? (List)result.getWriteStats().get() : ((HoodieData)result.getWriteStatuses()).map(WriteStatus::getStat).collectAsList());
    }

    protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>> writeStatuses) {
        return Collections.emptyMap();
    }

    protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
        SparkHoodiePartitioner upsertPartitioner = (SparkHoodiePartitioner)partitioner;
        BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
        BucketType btype = binfo.bucketType;
        try {
            if (btype.equals((Object)BucketType.INSERT)) {
                return this.handleInsert(binfo.fileIdPrefix, recordItr);
            }
            if (btype.equals((Object)BucketType.UPDATE)) {
                return this.handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
            }
            throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
        }
        catch (Throwable t) {
            String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
            LOG.error(msg, t);
            throw new HoodieUpsertException(msg, t);
        }
    }

    protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
        return this.handleUpsertPartition(instantTime, partition, recordItr, partitioner);
    }

    public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) throws IOException {
        if (!recordItr.hasNext()) {
            LOG.info("Empty partition with fileId => " + fileId);
            return Collections.emptyIterator();
        }
        if (!this.table.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).isPresent() && HoodieIndex.IndexType.BUCKET.equals((Object)this.config.getIndexType())) {
            return this.handleInsert(fileId, recordItr);
        }
        HoodieMergeHandle upsertHandle = this.getUpdateHandle(partitionPath, fileId, recordItr);
        return this.handleUpdateInternal(upsertHandle, fileId);
    }

    protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId) throws IOException {
        this.table.runMerge(upsertHandle, this.instantTime, fileId);
        return upsertHandle.getWriteStatusesAsIterator();
    }

    protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
        return HoodieMergeHandleFactory.create((WriteOperationType)this.operationType, (HoodieWriteConfig)this.config, (String)this.instantTime, (HoodieTable)this.table, recordItr, (String)partitionPath, (String)fileId, (TaskContextSupplier)this.taskContextSupplier, this.keyGeneratorOpt);
    }

    public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
        if (!recordItr.hasNext()) {
            LOG.info("Empty partition");
            return Collections.emptyIterator();
        }
        return new SparkLazyInsertIterable<T>(recordItr, true, this.config, this.instantTime, this.table, idPfx, this.taskContextSupplier, (WriteHandleFactory)new CreateHandleFactory());
    }

    public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
        if (profile == null) {
            throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
        }
        return new UpsertPartitioner(profile, this.context, this.table, this.config, this.operationType);
    }

    public Partitioner getInsertPartitioner(WorkloadProfile profile) {
        return this.getUpsertPartitioner(profile);
    }

    public Partitioner getLayoutPartitioner(WorkloadProfile profile, String layoutPartitionerClass) {
        return (Partitioner)ReflectionUtils.loadClass((String)layoutPartitionerClass, (Class[])new Class[]{WorkloadProfile.class, HoodieEngineContext.class, HoodieTable.class, HoodieWriteConfig.class}, (Object[])new Object[]{profile, this.context, this.table, this.config});
    }

    protected void runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
        SparkValidatorUtils.runValidators(this.config, writeMetadata, this.context, this.table, this.instantTime);
    }
}

