/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.AbstractDeleteHelper;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

public class SparkDeleteHelper<T extends HoodieRecordPayload, R>
extends AbstractDeleteHelper<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
    private SparkDeleteHelper() {
    }

    public static SparkDeleteHelper newInstance() {
        return DeleteHelperHolder.SPARK_DELETE_HELPER;
    }

    @Override
    public JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys2, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, int parallelism) {
        boolean isIndexingGlobal = table.getIndex().isGlobal();
        if (isIndexingGlobal) {
            return keys2.keyBy(HoodieKey::getRecordKey).reduceByKey((Function2 & Serializable)(key1, key2) -> key1, parallelism).values();
        }
        return keys2.distinct(parallelism);
    }

    @Override
    public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(String instantTime, JavaRDD<HoodieKey> keys2, HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> deleteExecutor) {
        JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
        try {
            HoodieWriteMetadata<Object> result = null;
            JavaRDD dedupedKeys = keys2;
            int parallelism = config.getDeleteShuffleParallelism();
            if (config.shouldCombineBeforeDelete()) {
                dedupedKeys = this.deduplicateKeys(keys2, table, parallelism);
            } else if (!keys2.partitions().isEmpty()) {
                dedupedKeys = keys2.repartition(parallelism);
            }
            JavaRDD dedupedRecords = dedupedKeys.map((Function & Serializable)key -> new HoodieRecord<EmptyHoodieRecordPayload>((HoodieKey)key, new EmptyHoodieRecordPayload()));
            Instant beginTag = Instant.now();
            JavaRDD<HoodieRecord<T>> taggedRecords = table.getIndex().tagLocation(dedupedRecords, context, table);
            Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
            JavaRDD taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
            if (!taggedValidRecords.isEmpty()) {
                result = deleteExecutor.execute(taggedValidRecords);
                result.setIndexLookupDuration(tagLocationDuration);
            } else {
                deleteExecutor.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap(), new WorkloadStat())), instantTime);
                result = new HoodieWriteMetadata();
                result.setWriteStatuses((JavaRDD<WriteStatus>)jsc.emptyRDD());
                deleteExecutor.commitOnAutoCommit(result);
            }
            return result;
        }
        catch (Throwable e) {
            if (e instanceof HoodieUpsertException) {
                throw (HoodieUpsertException)e;
            }
            throw new HoodieUpsertException("Failed to delete for commit time " + instantTime, e);
        }
    }

    private static class DeleteHelperHolder {
        private static final SparkDeleteHelper SPARK_DELETE_HELPER = new SparkDeleteHelper();

        private DeleteHelperHolder() {
        }
    }
}

