/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.clustering.run.strategy;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.LazyConcatenatingIterator;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.FutureUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.CloseableIteratorListener;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class MultipleSparkJobExecutionStrategy<T>
extends ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
    private static final Logger LOG = LoggerFactory.getLogger(MultipleSparkJobExecutionStrategy.class);

    public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
        super(table, engineContext, writeConfig);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(HoodieClusteringPlan clusteringPlan, Schema schema, String instantTime) {
        JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
        boolean shouldPreserveMetadata = (Boolean)Option.ofNullable((Object)clusteringPlan.getPreserveHoodieMetadata()).orElse((Object)true);
        ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(Math.min(clusteringPlan.getInputGroups().size(), this.writeConfig.getClusteringMaxParallelism()), (ThreadFactory)new CustomizedThreadFactory("clustering-job-group", true));
        try {
            boolean canUseRowWriter = this.getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", true);
            Stream writeStatusesStream = ((List)FutureUtils.allOf(clusteringPlan.getInputGroups().stream().map(inputGroup -> {
                if (canUseRowWriter) {
                    return this.runClusteringForGroupAsyncAsRow((HoodieClusteringGroup)inputGroup, clusteringPlan.getStrategy().getStrategyParams(), shouldPreserveMetadata, instantTime, clusteringExecutorService);
                }
                return this.runClusteringForGroupAsync((HoodieClusteringGroup)inputGroup, clusteringPlan.getStrategy().getStrategyParams(), shouldPreserveMetadata, instantTime, clusteringExecutorService);
            }).collect(Collectors.toList())).join()).stream();
            JavaRDD<WriteStatus>[] writeStatuses = this.convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
            JavaRDD writeStatusRDD = engineContext.union(writeStatuses);
            HoodieWriteMetadata writeMetadata = new HoodieWriteMetadata();
            writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));
            HoodieWriteMetadata hoodieWriteMetadata = writeMetadata;
            return hoodieWriteMetadata;
        }
        finally {
            clusteringExecutorService.shutdown();
        }
    }

    public abstract HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> var1, int var2, String var3, Map<String, String> var4, Schema var5, List<HoodieFileGroupId> var6, boolean var7, Map<String, String> var8);

    public abstract HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> var1, int var2, String var3, Map<String, String> var4, Schema var5, List<HoodieFileGroupId> var6, boolean var7, Map<String, String> var8);

    protected BulkInsertPartitioner<Dataset<Row>> getRowPartitioner(Map<String, String> strategyParams, Schema schema) {
        return this.getPartitioner(strategyParams, schema, true);
    }

    protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getRDDPartitioner(Map<String, String> strategyParams, Schema schema) {
        return this.getPartitioner(strategyParams, schema, false);
    }

    private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategyParams, Schema schema, boolean isRowPartitioner) {
        Option orderByColumnsOpt = Option.ofNullable((Object)strategyParams.get(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key())).map(listStr -> listStr.split(","));
        return (BulkInsertPartitioner)orderByColumnsOpt.map(orderByColumns -> {
            HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = this.getWriteConfig().getLayoutOptimizationStrategy();
            switch (layoutOptStrategy) {
                case ZORDER: 
                case HILBERT: {
                    return isRowPartitioner ? new RowSpatialCurveSortPartitioner(this.getWriteConfig()) : new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext)this.getEngineContext(), (String[])orderByColumns, layoutOptStrategy, this.getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields((Schema)schema), this.recordType);
                }
                case LINEAR: {
                    return isRowPartitioner ? new RowCustomColumnsSortPartitioner((String[])orderByColumns, this.getWriteConfig()) : new RDDCustomColumnsSortPartitioner((String[])orderByColumns, HoodieAvroUtils.addMetadataFields((Schema)schema), this.getWriteConfig());
                }
            }
            throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy));
        }).orElseGet(() -> isRowPartitioner ? BulkInsertInternalPartitionerWithRowsFactory.get(this.getWriteConfig(), this.getHoodieTable().isPartitioned(), true) : BulkInsertInternalPartitionerFactory.get(this.getHoodieTable(), this.getWriteConfig(), true));
    }

    protected CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, boolean preserveHoodieMetadata, String instantTime, ExecutorService clusteringExecutorService) {
        return CompletableFuture.supplyAsync(() -> {
            JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
            HoodieData<HoodieRecord<T>> inputRecords = this.readRecordsForGroup(jsc, clusteringGroup, instantTime);
            Schema readerSchema = HoodieAvroUtils.addMetadataFields((Schema)new Schema.Parser().parse(this.getWriteConfig().getSchema()));
            List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream().map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())).collect(Collectors.toList());
            return this.performClusteringWithRecordsRDD(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata, clusteringGroup.getExtraMetadata());
        }, clusteringExecutorService);
    }

    protected CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncAsRow(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, boolean shouldPreserveHoodieMetadata, String instantTime, ExecutorService clusteringExecutorService) {
        return CompletableFuture.supplyAsync(() -> {
            JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
            Schema tableSchemaWithMetaFields = null;
            try {
                tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields((Schema)new TableSchemaResolver(this.getHoodieTable().getMetaClient()).getTableAvroSchema(false), (boolean)this.getWriteConfig().allowOperationMetadataField());
            }
            catch (Exception e) {
                throw new HoodieException("Failed to get table schema during clustering", (Throwable)e);
            }
            Dataset<Row> inputRecords = this.readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime, tableSchemaWithMetaFields);
            List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream().map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())).collect(Collectors.toList());
            return this.performClusteringWithRecordsAsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, tableSchemaWithMetaFields, inputFileIds, shouldPreserveHoodieMetadata, clusteringGroup.getExtraMetadata());
        }, clusteringExecutorService);
    }

    private HoodieData<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
        List clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
        int readParallelism = Math.min(this.writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size());
        ReaderContextFactory readerContextFactory = this.getEngineContext().getReaderContextFactory(this.getHoodieTable().getMetaClient());
        return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism).mapPartitions((FlatMapFunction & Serializable)clusteringOpsPartition -> {
            ArrayList suppliers = new ArrayList();
            long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction((TaskContextSupplier)new SparkTaskContextSupplier(), (HoodieConfig)this.getWriteConfig());
            LOG.info("MaxMemoryPerCompaction run as part of clustering => {}", (Object)maxMemoryPerCompaction);
            clusteringOpsPartition.forEachRemaining(clusteringOp -> {
                Supplier<ClosableIterator> iteratorSupplier = () -> this.getRecordIterator(readerContextFactory, (ClusteringOperation)clusteringOp, instantTime, maxMemoryPerCompaction);
                suppliers.add(iteratorSupplier);
            });
            return CloseableIteratorListener.addListener(new LazyConcatenatingIterator(suppliers));
        }));
    }

    private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, final String instantTime, Schema tableSchemaWithMetaFields) {
        List clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
        final String basePath = this.getWriteConfig().getBasePath();
        long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction((TaskContextSupplier)this.getEngineContext().getTaskContextSupplier(), (HoodieConfig)this.writeConfig);
        final TypedProperties readerProperties = this.getReaderProperties(maxMemoryPerCompaction);
        final boolean usePosition = this.getWriteConfig().getBooleanOrDefault(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS);
        final boolean enableLogBlocksScan = this.getWriteConfig().enableOptimizedLogBlocksScan();
        final String internalSchemaStr = this.getWriteConfig().getInternalSchema();
        final SerializableSchema serializableTableSchemaWithMetaFields = new SerializableSchema(tableSchemaWithMetaFields);
        final HoodieTableMetaClient metaClient = this.getHoodieTable().getMetaClient();
        final ReaderContextFactory readerContextFactory = this.getEngineContext().getReaderContextFactory(metaClient);
        StructType sparkSchemaWithMetaFields = AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields);
        RDD internalRowRDD = jsc.parallelize(clusteringOps, clusteringOps.size()).flatMap((FlatMapFunction)new FlatMapFunction<ClusteringOperation, InternalRow>(){

            public Iterator<InternalRow> call(ClusteringOperation clusteringOperation) throws Exception {
                FileSlice fileSlice = MultipleSparkJobExecutionStrategy.this.clusteringOperationToFileSlice(basePath, clusteringOperation);
                Schema readerSchema = serializableTableSchemaWithMetaFields.get();
                Option internalSchemaOption = SerDeHelper.fromJson((String)internalSchemaStr);
                HoodieFileGroupReader fileGroupReader = MultipleSparkJobExecutionStrategy.getFileGroupReader((HoodieTableMetaClient)metaClient, (FileSlice)fileSlice, (Schema)readerSchema, (Option)internalSchemaOption, (ReaderContextFactory)readerContextFactory, (String)instantTime, (TypedProperties)readerProperties, (boolean)usePosition, (boolean)enableLogBlocksScan);
                return CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator());
            }
        }).rdd();
        return SparkAdapterSupport$.MODULE$.sparkAdapter().getUnsafeUtils().createDataFrameFromRDD(((HoodieSparkEngineContext)this.getEngineContext()).getSqlContext().sparkSession(), (RDD<InternalRow>)internalRowRDD, sparkSchemaWithMetaFields);
    }

    private JavaRDD<WriteStatus>[] convertStreamToArray(Stream<JavaRDD<WriteStatus>> writeStatusRDDStream) {
        A[] writeStatusObjects = writeStatusRDDStream.toArray(Object[]::new);
        JavaRDD[] writeStatusRDDArray = new JavaRDD[writeStatusObjects.length];
        for (int i = 0; i < writeStatusObjects.length; ++i) {
            writeStatusRDDArray[i] = (JavaRDD)writeStatusObjects[i];
        }
        return writeStatusRDDArray;
    }
}

