/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.clustering.run.strategy;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FutureUtils;
import org.apache.hudi.common.util.MappingIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.sources.BaseRelation;

public abstract class MultipleSparkJobExecutionStrategy<T>
extends ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
    private static final Logger LOG = LogManager.getLogger(MultipleSparkJobExecutionStrategy.class);

    public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
        super(table, engineContext, writeConfig);
    }

    @Override
    public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(HoodieClusteringPlan clusteringPlan, Schema schema2, String instantTime) {
        JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
        boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false);
        Stream writeStatusesStream = FutureUtils.allOf(clusteringPlan.getInputGroups().stream().map(inputGroup -> {
            if (this.getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) {
                return this.runClusteringForGroupAsyncAsRow((HoodieClusteringGroup)((Object)inputGroup), clusteringPlan.getStrategy().getStrategyParams(), shouldPreserveMetadata, instantTime);
            }
            return this.runClusteringForGroupAsync((HoodieClusteringGroup)((Object)inputGroup), clusteringPlan.getStrategy().getStrategyParams(), shouldPreserveMetadata, instantTime);
        }).collect(Collectors.toList())).join().stream();
        JavaRDD<WriteStatus>[] writeStatuses = this.convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
        JavaRDD writeStatusRDD = engineContext.union(writeStatuses);
        HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<HoodieData<WriteStatus>>();
        writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));
        return writeMetadata;
    }

    public abstract HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> var1, int var2, String var3, Map<String, String> var4, Schema var5, List<HoodieFileGroupId> var6, boolean var7, Map<String, String> var8);

    public abstract HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> var1, int var2, String var3, Map<String, String> var4, Schema var5, List<HoodieFileGroupId> var6, boolean var7, Map<String, String> var8);

    protected BulkInsertPartitioner<Dataset<Row>> getRowPartitioner(Map<String, String> strategyParams, Schema schema2) {
        return this.getPartitioner(strategyParams, schema2, true);
    }

    protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getRDDPartitioner(Map<String, String> strategyParams, Schema schema2) {
        return this.getPartitioner(strategyParams, schema2, false);
    }

    private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategyParams, Schema schema2, boolean isRowPartitioner) {
        Option<String[]> orderByColumnsOpt = Option.ofNullable(strategyParams.get(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key())).map(listStr -> listStr.split(","));
        return orderByColumnsOpt.map(orderByColumns -> {
            HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = this.getWriteConfig().getLayoutOptimizationStrategy();
            switch (layoutOptStrategy) {
                case ZORDER: 
                case HILBERT: {
                    return isRowPartitioner ? new RowSpatialCurveSortPartitioner(this.getWriteConfig()) : new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext)this.getEngineContext(), (String[])orderByColumns, layoutOptStrategy, this.getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields(schema2), this.recordType);
                }
                case LINEAR: {
                    return isRowPartitioner ? new RowCustomColumnsSortPartitioner((String[])orderByColumns) : new RDDCustomColumnsSortPartitioner((String[])orderByColumns, HoodieAvroUtils.addMetadataFields(schema2), this.getWriteConfig().isConsistentLogicalTimestampEnabled());
                }
            }
            throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", new Object[]{layoutOptStrategy}));
        }).orElse(isRowPartitioner ? BulkInsertInternalPartitionerWithRowsFactory.get(this.getWriteConfig(), this.getHoodieTable().isPartitioned(), true) : BulkInsertInternalPartitionerFactory.get(this.getHoodieTable(), this.getWriteConfig(), true));
    }

    private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, boolean preserveHoodieMetadata, String instantTime) {
        return CompletableFuture.supplyAsync(() -> {
            JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
            HoodieData<HoodieRecord<T>> inputRecords = this.readRecordsForGroup(jsc, clusteringGroup, instantTime);
            Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.getWriteConfig().getSchema()));
            List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream().map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())).collect(Collectors.toList());
            return this.performClusteringWithRecordsRDD(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata, clusteringGroup.getExtraMetadata());
        });
    }

    private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncAsRow(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, boolean shouldPreserveHoodieMetadata, String instantTime) {
        return CompletableFuture.supplyAsync(() -> {
            JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
            Dataset<Row> inputRecords = this.readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime);
            Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.getWriteConfig().getSchema()));
            List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream().map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())).collect(Collectors.toList());
            return this.performClusteringWithRecordsAsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, shouldPreserveHoodieMetadata, clusteringGroup.getExtraMetadata());
        });
    }

    private HoodieData<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
        List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
        boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
        if (hasLogFiles) {
            return this.readRecordsForGroupWithLogs(jsc, clusteringOps, instantTime);
        }
        return this.readRecordsForGroupBaseFiles(jsc, clusteringOps);
    }

    private HoodieData<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext jsc, List<ClusteringOperation> clusteringOps, String instantTime) {
        HoodieWriteConfig config = this.getWriteConfig();
        HoodieTable table = this.getHoodieTable();
        return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions((FlatMapFunction & Serializable)clusteringOpsPartition -> {
            ArrayList recordIterators = new ArrayList();
            clusteringOpsPartition.forEachRemaining(clusteringOp -> {
                long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config);
                LOG.info((Object)("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction));
                try {
                    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
                    HoodieMergedLogRecordScanner scanner = ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withFileSystem(table.getMetaClient().getFs()).withBasePath(table.getMetaClient().getBasePath()).withLogFilePaths((List)clusteringOp.getDeltaFilePaths())).withReaderSchema(readerSchema).withLatestInstantTime(instantTime).withMaxMemorySizeInBytes(maxMemoryPerCompaction).withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()).withReverseReader(config.getCompactionReverseLogReadEnabled()).withBufferSize(config.getMaxDFSStreamBufferSize()).withSpillableMapBasePath(config.getSpillableMapBasePath()).withPartition(clusteringOp.getPartitionPath()).withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan()).withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()).withRecordMerger(config.getRecordMerger()).build();
                    Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) ? Option.empty() : Option.of(HoodieFileReaderFactory.getReaderFactory(this.recordType).getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())));
                    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
                    recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema, tableConfig.getProps(), tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()))));
                }
                catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
                }
            });
            return new ConcatenatingIterator(recordIterators);
        }));
    }

    private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc, List<ClusteringOperation> clusteringOps) {
        SerializableConfiguration hadoopConf = new SerializableConfiguration(this.getHoodieTable().getHadoopConf());
        HoodieWriteConfig writeConfig = this.getWriteConfig();
        return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions((FlatMapFunction & Serializable)clusteringOpsPartition -> {
            ArrayList iteratorsForPartition = new ArrayList();
            clusteringOpsPartition.forEachRemaining(clusteringOp -> {
                try {
                    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema()));
                    HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(this.recordType).getFileReader(hadoopConf.get(), new Path(clusteringOp.getDataFilePath()));
                    Option keyGeneratorOp = writeConfig.populateMetaFields() ? Option.empty() : Option.of((BaseKeyGenerator)HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
                    MappingIterator<Object, Object> mappingIterator = new MappingIterator<Object, Object>(baseFileReader.getRecordIterator(readerSchema), rec -> ((HoodieRecord)rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema, writeConfig.getProps(), keyGeneratorOp));
                    iteratorsForPartition.add(mappingIterator);
                }
                catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
                }
            });
            return new ConcatenatingIterator(iteratorsForPartition);
        }));
    }

    private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
        Path[] paths;
        List clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
        boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
        SQLContext sqlContext = new SQLContext(jsc.sc());
        Path[] baseFilePaths = (Path[])clusteringOps.stream().map(op -> {
            ArrayList<String> readPaths = new ArrayList<String>();
            if (op.getBootstrapFilePath() != null) {
                readPaths.add(op.getBootstrapFilePath());
            }
            if (op.getDataFilePath() != null) {
                readPaths.add(op.getDataFilePath());
            }
            return readPaths;
        }).flatMap(Collection::stream).filter(path -> !path.isEmpty()).map(Path::new).toArray(Path[]::new);
        HashMap<String, String> params = new HashMap<String, String>();
        params.put("hoodie.datasource.query.type", "snapshot");
        params.put(HoodieCommonConfig.TIMESTAMP_AS_OF.key(), instantTime);
        if (hasLogFiles) {
            String compactionFractor = Option.ofNullable(this.getWriteConfig().getString("compaction.memory.fraction")).orElse("0.75");
            params.put("compaction.memory.fraction", compactionFractor);
            Path[] deltaPaths = (Path[])clusteringOps.stream().filter(op -> !op.getDeltaFilePaths().isEmpty()).flatMap(op -> op.getDeltaFilePaths().stream()).map(Path::new).toArray(Path[]::new);
            paths = CollectionUtils.combine(baseFilePaths, deltaPaths);
        } else {
            paths = baseFilePaths;
        }
        String readPathString = String.join((CharSequence)",", (CharSequence[])Arrays.stream(paths).map(Path::toString).toArray(String[]::new));
        params.put("hoodie.datasource.read.paths", readPathString);
        params.put("glob.paths", readPathString);
        BaseRelation relation = SparkAdapterSupport$.MODULE$.sparkAdapter().createRelation(sqlContext, this.getHoodieTable().getMetaClient(), null, paths, params);
        return sqlContext.baseRelationToDataFrame(relation);
    }

    private JavaRDD<WriteStatus>[] convertStreamToArray(Stream<JavaRDD<WriteStatus>> writeStatusRDDStream) {
        A[] writeStatusObjects = writeStatusRDDStream.toArray(Object[]::new);
        JavaRDD[] writeStatusRDDArray = new JavaRDD[writeStatusObjects.length];
        for (int i = 0; i < writeStatusObjects.length; ++i) {
            writeStatusRDDArray[i] = (JavaRDD)writeStatusObjects[i];
        }
        return writeStatusRDDArray;
    }
}

