/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.compact;

import java.io.IOException;
import java.util.List;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.HoodieSparkMergeOnReadTableCompactor;
import org.apache.spark.api.java.JavaRDD;

public class SparkRunCompactionActionExecutor<T extends HoodieRecordPayload>
extends BaseActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, HoodieWriteMetadata<JavaRDD<WriteStatus>>> {
    public SparkRunCompactionActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, String instantTime) {
        super(context, config, table, instantTime);
    }

    @Override
    public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
        HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(this.instantTime);
        HoodieTimeline pendingCompactionTimeline = this.table.getActiveTimeline().filterPendingCompactionTimeline();
        if (!pendingCompactionTimeline.containsInstant(instant)) {
            throw new IllegalStateException("No Compaction request available at " + this.instantTime + " to run compaction");
        }
        HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = new HoodieWriteMetadata<JavaRDD<WriteStatus>>();
        try {
            HoodieActiveTimeline timeline = this.table.getActiveTimeline();
            HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(this.table.getMetaClient(), this.instantTime);
            timeline.transitionCompactionRequestedToInflight(instant);
            this.table.getMetaClient().reloadActiveTimeline();
            HoodieSparkMergeOnReadTableCompactor compactor = new HoodieSparkMergeOnReadTableCompactor();
            JavaRDD<WriteStatus> statuses = compactor.compact(this.context, compactionPlan, this.table, this.config, this.instantTime);
            statuses.persist(SparkMemoryUtils.getWriteStatusStorageLevel(this.config.getProps()));
            this.context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata");
            List updateStatusMap = statuses.map(WriteStatus::getStat).collect();
            HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
            for (HoodieWriteStat stat : updateStatusMap) {
                metadata.addWriteStat(stat.getPartitionPath(), stat);
            }
            metadata.addMetadata("schema", this.config.getSchema());
            compactionMetadata.setWriteStatuses(statuses);
            compactionMetadata.setCommitted(false);
            compactionMetadata.setCommitMetadata(Option.of(metadata));
        }
        catch (IOException e) {
            throw new HoodieCompactionException("Could not compact " + this.config.getBasePath(), e);
        }
        return compactionMetadata;
    }
}

