/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.clustering.run.strategy;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.client.utils.SparkPartitionUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.FutureUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieSparkIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.sources.BaseRelation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class MultipleSparkJobExecutionStrategy<T>
extends ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
    private static final Logger LOG = LoggerFactory.getLogger(MultipleSparkJobExecutionStrategy.class);

    public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
        super(table, engineContext, writeConfig);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(HoodieClusteringPlan clusteringPlan, Schema schema, String instantTime) {
        JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
        boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(true);
        ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(Math.min(clusteringPlan.getInputGroups().size(), this.writeConfig.getClusteringMaxParallelism()), new CustomizedThreadFactory("clustering-job-group", true));
        try {
            Stream writeStatusesStream = FutureUtils.allOf(clusteringPlan.getInputGroups().stream().map(inputGroup -> {
                if (this.getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) {
                    return this.runClusteringForGroupAsyncAsRow((HoodieClusteringGroup)((Object)inputGroup), clusteringPlan.getStrategy().getStrategyParams(), shouldPreserveMetadata, instantTime, clusteringExecutorService);
                }
                return this.runClusteringForGroupAsync((HoodieClusteringGroup)((Object)inputGroup), clusteringPlan.getStrategy().getStrategyParams(), shouldPreserveMetadata, instantTime, clusteringExecutorService);
            }).collect(Collectors.toList())).join().stream();
            JavaRDD<WriteStatus>[] writeStatuses = this.convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
            JavaRDD writeStatusRDD = engineContext.union(writeStatuses);
            HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<HoodieData<WriteStatus>>();
            writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));
            HoodieWriteMetadata<HoodieData<WriteStatus>> hoodieWriteMetadata = writeMetadata;
            return hoodieWriteMetadata;
        }
        finally {
            clusteringExecutorService.shutdown();
        }
    }

    public abstract HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> var1, int var2, String var3, Map<String, String> var4, Schema var5, List<HoodieFileGroupId> var6, boolean var7, Map<String, String> var8);

    public abstract HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> var1, int var2, String var3, Map<String, String> var4, Schema var5, List<HoodieFileGroupId> var6, boolean var7, Map<String, String> var8);

    protected BulkInsertPartitioner<Dataset<Row>> getRowPartitioner(Map<String, String> strategyParams, Schema schema) {
        return this.getPartitioner(strategyParams, schema, true);
    }

    protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getRDDPartitioner(Map<String, String> strategyParams, Schema schema) {
        return this.getPartitioner(strategyParams, schema, false);
    }

    private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategyParams, Schema schema, boolean isRowPartitioner) {
        Option<String[]> orderByColumnsOpt = Option.ofNullable(strategyParams.get(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key())).map(listStr -> listStr.split(","));
        return orderByColumnsOpt.map(orderByColumns -> {
            HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = this.getWriteConfig().getLayoutOptimizationStrategy();
            switch (layoutOptStrategy) {
                case ZORDER: 
                case HILBERT: {
                    return isRowPartitioner ? new RowSpatialCurveSortPartitioner(this.getWriteConfig()) : new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext)this.getEngineContext(), (String[])orderByColumns, layoutOptStrategy, this.getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields(schema), this.recordType);
                }
                case LINEAR: {
                    return isRowPartitioner ? new RowCustomColumnsSortPartitioner((String[])orderByColumns, this.getWriteConfig()) : new RDDCustomColumnsSortPartitioner((String[])orderByColumns, HoodieAvroUtils.addMetadataFields(schema), this.getWriteConfig());
                }
            }
            throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", new Object[]{layoutOptStrategy}));
        }).orElseGet(() -> isRowPartitioner ? BulkInsertInternalPartitionerWithRowsFactory.get(this.getWriteConfig(), this.getHoodieTable().isPartitioned(), true) : BulkInsertInternalPartitionerFactory.get(this.getHoodieTable(), this.getWriteConfig(), true));
    }

    private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, boolean preserveHoodieMetadata, String instantTime, ExecutorService clusteringExecutorService) {
        return CompletableFuture.supplyAsync(() -> {
            JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
            HoodieData<HoodieRecord<T>> inputRecords = this.readRecordsForGroup(jsc, clusteringGroup, instantTime);
            Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.getWriteConfig().getSchema()));
            List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream().map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())).collect(Collectors.toList());
            return this.performClusteringWithRecordsRDD(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata, clusteringGroup.getExtraMetadata());
        }, clusteringExecutorService);
    }

    private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncAsRow(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, boolean shouldPreserveHoodieMetadata, String instantTime, ExecutorService clusteringExecutorService) {
        return CompletableFuture.supplyAsync(() -> {
            JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
            Dataset<Row> inputRecords = this.readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime);
            Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.getWriteConfig().getSchema()));
            List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream().map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())).collect(Collectors.toList());
            return this.performClusteringWithRecordsAsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, shouldPreserveHoodieMetadata, clusteringGroup.getExtraMetadata());
        }, clusteringExecutorService);
    }

    private HoodieData<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
        List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
        boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
        if (hasLogFiles) {
            return this.readRecordsForGroupWithLogs(jsc, clusteringOps, instantTime);
        }
        return this.readRecordsForGroupBaseFiles(jsc, clusteringOps);
    }

    private HoodieData<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext jsc, List<ClusteringOperation> clusteringOps, String instantTime) {
        HoodieWriteConfig config = this.getWriteConfig();
        HoodieTable table = this.getHoodieTable();
        StorageConfiguration<?> storageConf = table.getStorageConf();
        HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
        String bootstrapBasePath = tableConfig.getBootstrapBasePath().orElse(null);
        Option<String[]> partitionFields = tableConfig.getPartitionFields();
        return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions((FlatMapFunction & Serializable)clusteringOpsPartition -> {
            ArrayList recordIterators = new ArrayList();
            clusteringOpsPartition.forEachRemaining(clusteringOp -> {
                long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config);
                LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
                try {
                    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
                    HoodieMergedLogRecordScanner scanner = ((HoodieMergedLogRecordScanner.Builder)HoodieMergedLogRecordScanner.newBuilder().withStorage(table.getStorage()).withBasePath(table.getMetaClient().getBasePath()).withLogFilePaths((List)clusteringOp.getDeltaFilePaths())).withReaderSchema(readerSchema).withLatestInstantTime(instantTime).withMaxMemorySizeInBytes(maxMemoryPerCompaction).withReverseReader(config.getCompactionReverseLogReadEnabled()).withBufferSize(config.getMaxDFSStreamBufferSize()).withSpillableMapBasePath(config.getSpillableMapBasePath()).withPartition(clusteringOp.getPartitionPath()).withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan()).withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()).withRecordMerger(config.getRecordMerger()).withTableMetaClient(table.getMetaClient()).build();
                    Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) ? Option.empty() : Option.of(this.getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields, (ClusteringOperation)clusteringOp));
                    recordIterators.add(new HoodieFileSliceReader(baseFileReader, scanner, readerSchema, tableConfig.getPreCombineField(), config.getRecordMerger(), tableConfig.getProps(), tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()))));
                }
                catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
                }
            });
            return new ConcatenatingIterator(recordIterators);
        }));
    }

    private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc, List<ClusteringOperation> clusteringOps) {
        StorageConfiguration<?> storageConf = this.getHoodieTable().getStorageConf();
        HoodieWriteConfig writeConfig = this.getWriteConfig();
        HoodieTableConfig tableConfig = this.getHoodieTable().getMetaClient().getTableConfig();
        String bootstrapBasePath = tableConfig.getBootstrapBasePath().orElse(null);
        Option<String[]> partitionFields = tableConfig.getPartitionFields();
        return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions((FlatMapFunction & Serializable)clusteringOpsPartition -> {
            ArrayList iteratorsForPartition = new ArrayList();
            clusteringOpsPartition.forEachRemaining(clusteringOp -> {
                try {
                    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema()));
                    HoodieFileReader baseFileReader = this.getBaseOrBootstrapFileReader(storageConf, bootstrapBasePath, partitionFields, (ClusteringOperation)clusteringOp);
                    Option<BaseKeyGenerator> keyGeneratorOp = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
                    CloseableMappingIterator<Object, Object> mappingIterator = new CloseableMappingIterator<Object, Object>(baseFileReader.getRecordIterator(readerSchema), rec -> ((HoodieRecord)rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchema, writeConfig.getProps(), keyGeneratorOp));
                    iteratorsForPartition.add(mappingIterator);
                }
                catch (IOException e) {
                    throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e);
                }
            });
            return new ConcatenatingIterator(iteratorsForPartition);
        }));
    }

    private HoodieFileReader getBaseOrBootstrapFileReader(StorageConfiguration<?> storageConf, String bootstrapBasePath, Option<String[]> partitionFields, ClusteringOperation clusteringOp) throws IOException {
        StoragePath dataFilePath = new StoragePath(clusteringOp.getDataFilePath());
        HoodieHadoopStorage storage = new HoodieHadoopStorage(dataFilePath, storageConf);
        HoodieFileReader baseFileReader = HoodieSparkIOFactory.getHoodieSparkIOFactory(storage).getReaderFactory(this.recordType).getFileReader(this.writeConfig, dataFilePath);
        if (StringUtils.nonEmpty(clusteringOp.getBootstrapFilePath()) && StringUtils.nonEmpty(bootstrapBasePath)) {
            String bootstrapFilePath = clusteringOp.getBootstrapFilePath();
            Object[] partitionValues = new Object[]{};
            if (partitionFields.isPresent()) {
                int startOfPartitionPath = bootstrapFilePath.indexOf(bootstrapBasePath) + bootstrapBasePath.length() + 1;
                String partitionFilePath = bootstrapFilePath.substring(startOfPartitionPath, bootstrapFilePath.lastIndexOf("/"));
                partitionValues = SparkPartitionUtils.getPartitionFieldVals(partitionFields, partitionFilePath, bootstrapBasePath, baseFileReader.getSchema(), storageConf.unwrapAs(Configuration.class));
            }
            baseFileReader = HoodieSparkIOFactory.getHoodieSparkIOFactory(storage).getReaderFactory(this.recordType).newBootstrapFileReader(baseFileReader, HoodieSparkIOFactory.getHoodieSparkIOFactory(storage).getReaderFactory(this.recordType).getFileReader(this.writeConfig, new StoragePath(bootstrapFilePath)), partitionFields, partitionValues);
        }
        return baseFileReader;
    }

    private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
        StoragePath[] paths;
        List clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
        boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
        SQLContext sqlContext = new SQLContext(jsc.sc());
        StoragePath[] baseFilePaths = (StoragePath[])clusteringOps.stream().map(op -> {
            ArrayList<String> readPaths = new ArrayList<String>();
            if (op.getDataFilePath() != null) {
                readPaths.add(op.getDataFilePath());
            }
            return readPaths;
        }).flatMap(Collection::stream).filter(path -> !path.isEmpty()).map(StoragePath::new).toArray(StoragePath[]::new);
        HashMap<String, String> params = new HashMap<String, String>();
        params.put("hoodie.datasource.query.type", "snapshot");
        params.put(HoodieCommonConfig.TIMESTAMP_AS_OF.key(), instantTime);
        if (hasLogFiles) {
            String compactionFractor = Option.ofNullable(this.getWriteConfig().getString("compaction.memory.fraction")).orElse("0.75");
            params.put("compaction.memory.fraction", compactionFractor);
            StoragePath[] deltaPaths = (StoragePath[])clusteringOps.stream().filter(op -> !op.getDeltaFilePaths().isEmpty()).flatMap(op -> op.getDeltaFilePaths().stream()).map(StoragePath::new).toArray(StoragePath[]::new);
            paths = CollectionUtils.combine(baseFilePaths, deltaPaths);
        } else {
            paths = baseFilePaths;
        }
        String readPathString = String.join((CharSequence)",", (CharSequence[])Arrays.stream(paths).map(StoragePath::toString).toArray(String[]::new));
        String globPathString = String.join((CharSequence)",", (CharSequence[])Arrays.stream(paths).map(StoragePath::getParent).map(StoragePath::toString).distinct().toArray(String[]::new));
        params.put("hoodie.datasource.read.paths", readPathString);
        params.put("glob.paths", globPathString);
        BaseRelation relation = SparkAdapterSupport$.MODULE$.sparkAdapter().createRelation(sqlContext, this.getHoodieTable().getMetaClient(), null, paths, params);
        return sqlContext.baseRelationToDataFrame(relation);
    }

    private JavaRDD<WriteStatus>[] convertStreamToArray(Stream<JavaRDD<WriteStatus>> writeStatusRDDStream) {
        A[] writeStatusObjects = writeStatusRDDStream.toArray(Object[]::new);
        JavaRDD[] writeStatusRDDArray = new JavaRDD[writeStatusObjects.length];
        for (int i = 0; i < writeStatusObjects.length; ++i) {
            writeStatusRDDArray[i] = (JavaRDD)writeStatusObjects[i];
        }
        return writeStatusRDDArray;
    }
}

