/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.clustering.run.strategy;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.io.BinaryCopyHandleFactory;
import org.apache.hudi.io.HoodieBinaryCopyHandle;
import org.apache.hudi.parquet.io.ParquetBinaryCopyChecker;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkBinaryCopyClusteringExecutionStrategy<T>
extends SparkSortAndSizeExecutionStrategy<T> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkBinaryCopyClusteringExecutionStrategy.class);

    public SparkBinaryCopyClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
        super(table, engineContext, writeConfig);
    }

    @Override
    public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(HoodieClusteringPlan clusteringPlan, Schema schema, String instantTime) {
        List<ClusteringGroupInfo> clusteringGroupInfos = clusteringPlan.getInputGroups().stream().map(ClusteringGroupInfo::create).collect(Collectors.toList());
        if (!this.supportBinaryStreamCopy(clusteringGroupInfos, clusteringPlan.getStrategy().getStrategyParams())) {
            LOG.info("Required conditions for binary stream copy are currently not satisfied, falling back to default clustering behavior");
            this.writeConfig = HoodieWriteConfig.newBuilder().withProperties((Properties)this.writeConfig.getProps()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetWriteLegacyFormat("false").build()).build();
            return super.performClustering(clusteringPlan, schema, instantTime);
        }
        LOG.info("Required conditions are currently satisfied, enabling the optimization of using binary stream copy ");
        JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
        TaskContextSupplier taskContextSupplier = this.getEngineContext().getTaskContextSupplier();
        SerializableSchema serializableSchema = new SerializableSchema(schema);
        boolean shouldPreserveMetadata = (Boolean)Option.ofNullable((Object)clusteringPlan.getPreserveHoodieMetadata()).orElse((Object)false);
        JavaRDD groupInfoJavaRDD = engineContext.parallelize(clusteringGroupInfos, clusteringGroupInfos.size());
        LOG.info("number of partitions for clustering " + groupInfoJavaRDD.getNumPartitions());
        JavaRDD writeStatusRDD = groupInfoJavaRDD.mapPartitions((FlatMapFunction & Serializable)clusteringOps -> {
            Iterable clusteringOpsIterable = () -> clusteringOps;
            return StreamSupport.stream(clusteringOpsIterable.spliterator(), false).flatMap(clusteringOp -> this.runClusteringForGroup((ClusteringGroupInfo)clusteringOp, clusteringPlan.getStrategy().getStrategyParams(), shouldPreserveMetadata, serializableSchema, taskContextSupplier, instantTime)).iterator();
        });
        HoodieWriteMetadata writeMetadata = new HoodieWriteMetadata();
        writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));
        return writeMetadata;
    }

    private Stream<WriteStatus> runClusteringForGroup(ClusteringGroupInfo clusteringOps, Map<String, String> strategyParams, boolean preserveHoodieMetadata, SerializableSchema schema, TaskContextSupplier taskContextSupplier, String instantTime) {
        ArrayList statuses = new ArrayList();
        List inputFileIds = clusteringOps.getOperations().stream().map(op -> new HoodieFileGroupId(op.getPartitionPath(), op.getFileId())).collect(Collectors.toList());
        List inputFilePaths = clusteringOps.getOperations().stream().map(op -> new StoragePath(op.getDataFilePath())).collect(Collectors.toList());
        BinaryCopyHandleFactory factory = new BinaryCopyHandleFactory(inputFilePaths);
        HoodieBinaryCopyHandle handler = factory.create(this.getWriteConfig(), instantTime, this.getHoodieTable(), ((HoodieFileGroupId)inputFileIds.get(0)).getPartitionPath(), FSUtils.createNewFileIdPfx(), taskContextSupplier);
        handler.write();
        statuses.addAll(handler.close());
        return statuses.stream();
    }

    public boolean supportBinaryStreamCopy(List<ClusteringGroupInfo> inputGroups, Map<String, String> strategyParams) {
        if (this.getHoodieTable().getMetaClient().getTableType() != HoodieTableType.COPY_ON_WRITE) {
            LOG.warn("SparkBinaryCopyClusteringExecutionStrategy is only supported for COW tables. Will fall back to common clustering execution strategy.");
            return false;
        }
        Option orderByColumnsOpt = Option.ofNullable((Object)strategyParams.get(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key())).map(listStr -> listStr.split(","));
        if (orderByColumnsOpt.isPresent()) {
            LOG.warn("SparkBinaryCopyClusteringExecutionStrategy does not support sort by columns. Will fall back to common clustering execution strategy.");
            return false;
        }
        if (!this.getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat().equals((Object)HoodieFileFormat.PARQUET)) {
            LOG.warn("SparkBinaryCopyClusteringExecutionStrategy only supports parquet base files. Will fall back to common clustering execution strategy.");
            return false;
        }
        JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
        List fileStatus = engineContext.parallelize(inputGroups, inputGroups.size()).flatMap((FlatMapFunction & Serializable)group -> group.getOperations().iterator()).map((Function & Serializable)op -> {
            String filePath = op.getDataFilePath();
            return ParquetBinaryCopyChecker.collectFileInfo((Configuration)((Configuration)this.getHoodieTable().getStorageConf().unwrapAs(Configuration.class)), (String)filePath);
        }).collect();
        return ParquetBinaryCopyChecker.verifyFiles((List)fileStatus);
    }
}

