/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.clustering.run.strategy;

import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;

public abstract class SingleSparkJobExecutionStrategy<T extends HoodieRecordPayload<T>>
extends ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
    private static final Logger LOG = LogManager.getLogger(SingleSparkJobExecutionStrategy.class);

    public SingleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
        super(table, engineContext, writeConfig);
    }

    @Override
    public HoodieWriteMetadata<JavaRDD<WriteStatus>> performClustering(HoodieClusteringPlan clusteringPlan, Schema schema, String instantTime) {
        JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
        TaskContextSupplier taskContextSupplier = this.getEngineContext().getTaskContextSupplier();
        SerializableSchema serializableSchema = new SerializableSchema(schema);
        List clusteringGroupInfos = clusteringPlan.getInputGroups().stream().map(clusteringGroup -> ClusteringGroupInfo.create(clusteringGroup)).collect(Collectors.toList());
        String umask = engineContext.hadoopConfiguration().get("fs.permissions.umask-mode");
        Broadcast umaskBroadcastValue = engineContext.broadcast((Object)umask);
        JavaRDD groupInfoJavaRDD = engineContext.parallelize(clusteringGroupInfos, clusteringGroupInfos.size());
        LOG.info((Object)("number of partitions for clustering " + groupInfoJavaRDD.getNumPartitions()));
        JavaRDD writeStatusRDD = groupInfoJavaRDD.mapPartitions((FlatMapFunction & Serializable)clusteringOps -> {
            Configuration configuration = new Configuration();
            configuration.set("fs.permissions.umask-mode", (String)umaskBroadcastValue.getValue());
            Iterable clusteringOpsIterable = () -> clusteringOps;
            List groupsInPartition = StreamSupport.stream(clusteringOpsIterable.spliterator(), false).collect(Collectors.toList());
            return groupsInPartition.stream().flatMap(clusteringOp -> this.runClusteringForGroup((ClusteringGroupInfo)clusteringOp, clusteringPlan.getStrategy().getStrategyParams(), Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), serializableSchema, taskContextSupplier, instantTime)).iterator();
        });
        HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = new HoodieWriteMetadata<JavaRDD<WriteStatus>>();
        writeMetadata.setWriteStatuses(writeStatusRDD);
        return writeMetadata;
    }

    private Stream<WriteStatus> runClusteringForGroup(ClusteringGroupInfo clusteringOps, Map<String, String> strategyParams, boolean preserveHoodieMetadata, SerializableSchema schema, TaskContextSupplier taskContextSupplier, String instantTime) {
        List<HoodieFileGroupId> inputFileIds = clusteringOps.getOperations().stream().map(op -> new HoodieFileGroupId(op.getPartitionPath(), op.getFileId())).collect(Collectors.toList());
        Iterator<HoodieRecord<T>> inputRecords = this.readRecordsForGroupBaseFiles(clusteringOps.getOperations());
        Iterator<List<WriteStatus>> writeStatuses = this.performClusteringWithRecordsIterator(inputRecords, clusteringOps.getNumOutputGroups(), instantTime, strategyParams, schema.get(), inputFileIds, preserveHoodieMetadata, taskContextSupplier);
        Iterable writestatusIterable = () -> writeStatuses;
        return StreamSupport.stream(writestatusIterable.spliterator(), false).flatMap(writeStatusList -> writeStatusList.stream());
    }

    public abstract Iterator<List<WriteStatus>> performClusteringWithRecordsIterator(Iterator<HoodieRecord<T>> var1, int var2, String var3, Map<String, String> var4, Schema var5, List<HoodieFileGroupId> var6, boolean var7, TaskContextSupplier var8);

    private Iterator<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
        List iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> {
            Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.getWriteConfig().getSchema()));
            Iterable indexedRecords = () -> {
                try {
                    return HoodieFileReaderFactory.getFileReader(this.getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())).getRecordIterator(readerSchema);
                }
                catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
                }
            };
            return StreamSupport.stream(indexedRecords.spliterator(), false).map(record -> this.transform((IndexedRecord)record)).iterator();
        }).collect(Collectors.toList());
        return new ConcatenatingIterator<HoodieRecord<T>>(iteratorsForPartition);
    }

    private HoodieRecord<T> transform(IndexedRecord indexedRecord) {
        GenericRecord record = (GenericRecord)indexedRecord;
        Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
        if (!this.getWriteConfig().populateMetaFields()) {
            try {
                keyGeneratorOpt = Option.of((BaseKeyGenerator)HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(this.getWriteConfig().getProps())));
            }
            catch (IOException e) {
                throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e);
            }
        }
        String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt);
        String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt);
        HoodieKey hoodieKey = new HoodieKey(key, partition);
        RewriteAvroPayload avroPayload = new RewriteAvroPayload(record);
        HoodieRecord<RewriteAvroPayload> hoodieRecord = new HoodieRecord<RewriteAvroPayload>(hoodieKey, avroPayload);
        return hoodieRecord;
    }
}

