/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.clustering.run.strategy;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.LazyConcatenatingIterator;
import org.apache.hudi.client.utils.SparkPartitionUtils;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.FutureUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieSparkIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.SparkBroadcastManager;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.HoodieDataTypeUtils;
import org.apache.spark.sql.HoodieUnsafeUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.BaseRelation;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class MultipleSparkJobExecutionStrategy<T>
extends ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
    private static final Logger LOG = LoggerFactory.getLogger(MultipleSparkJobExecutionStrategy.class);

    public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
        super(table, engineContext, writeConfig);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(HoodieClusteringPlan clusteringPlan, Schema schema, String instantTime) {
        JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
        boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(true);
        ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(Math.min(clusteringPlan.getInputGroups().size(), this.writeConfig.getClusteringMaxParallelism()), new CustomizedThreadFactory("clustering-job-group", true));
        try {
            boolean canUseRowWriter;
            boolean bl = canUseRowWriter = this.getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", true) && HoodieDataTypeUtils.canUseRowWriter(schema, engineContext.hadoopConfiguration());
            if (canUseRowWriter) {
                HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty(this.writeConfig.getProps(), schema);
            }
            Stream writeStatusesStream = FutureUtils.allOf(clusteringPlan.getInputGroups().stream().map(inputGroup -> {
                if (canUseRowWriter) {
                    return this.runClusteringForGroupAsyncAsRow((HoodieClusteringGroup)((Object)inputGroup), clusteringPlan.getStrategy().getStrategyParams(), shouldPreserveMetadata, instantTime, clusteringExecutorService);
                }
                return this.runClusteringForGroupAsync((HoodieClusteringGroup)((Object)inputGroup), clusteringPlan.getStrategy().getStrategyParams(), shouldPreserveMetadata, instantTime, clusteringExecutorService);
            }).collect(Collectors.toList())).join().stream();
            JavaRDD<WriteStatus>[] writeStatuses = this.convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
            JavaRDD writeStatusRDD = engineContext.union(writeStatuses);
            HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<HoodieData<WriteStatus>>();
            writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));
            HoodieWriteMetadata<HoodieData<WriteStatus>> hoodieWriteMetadata = writeMetadata;
            return hoodieWriteMetadata;
        }
        finally {
            clusteringExecutorService.shutdown();
        }
    }

    public abstract HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> var1, int var2, String var3, Map<String, String> var4, Schema var5, List<HoodieFileGroupId> var6, boolean var7, Map<String, String> var8);

    public abstract HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> var1, int var2, String var3, Map<String, String> var4, Schema var5, List<HoodieFileGroupId> var6, boolean var7, Map<String, String> var8);

    protected BulkInsertPartitioner<Dataset<Row>> getRowPartitioner(Map<String, String> strategyParams, Schema schema) {
        return this.getPartitioner(strategyParams, schema, true);
    }

    protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getRDDPartitioner(Map<String, String> strategyParams, Schema schema) {
        return this.getPartitioner(strategyParams, schema, false);
    }

    private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategyParams, Schema schema, boolean isRowPartitioner) {
        Option<String[]> orderByColumnsOpt = Option.ofNullable(strategyParams.get(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key())).map(listStr -> listStr.split(","));
        return orderByColumnsOpt.map(orderByColumns -> {
            HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = this.getWriteConfig().getLayoutOptimizationStrategy();
            switch (layoutOptStrategy) {
                case ZORDER: 
                case HILBERT: {
                    return isRowPartitioner ? new RowSpatialCurveSortPartitioner(this.getWriteConfig()) : new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext)this.getEngineContext(), (String[])orderByColumns, layoutOptStrategy, this.getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields(schema), this.recordType);
                }
                case LINEAR: {
                    return isRowPartitioner ? new RowCustomColumnsSortPartitioner((String[])orderByColumns, this.getWriteConfig()) : new RDDCustomColumnsSortPartitioner((String[])orderByColumns, HoodieAvroUtils.addMetadataFields(schema), this.getWriteConfig());
                }
            }
            throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", new Object[]{layoutOptStrategy}));
        }).orElseGet(() -> isRowPartitioner ? BulkInsertInternalPartitionerWithRowsFactory.get(this.getWriteConfig(), this.getHoodieTable().isPartitioned(), true) : BulkInsertInternalPartitionerFactory.get(this.getHoodieTable(), this.getWriteConfig(), true));
    }

    private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, boolean preserveHoodieMetadata, String instantTime, ExecutorService clusteringExecutorService) {
        return CompletableFuture.supplyAsync(() -> {
            JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
            HoodieData<HoodieRecord<T>> inputRecords = this.readRecordsForGroup(jsc, clusteringGroup, instantTime);
            Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(this.getWriteConfig().getSchema()));
            List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream().map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())).collect(Collectors.toList());
            return this.performClusteringWithRecordsRDD(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata, clusteringGroup.getExtraMetadata());
        }, clusteringExecutorService);
    }

    private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncAsRow(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, boolean shouldPreserveHoodieMetadata, String instantTime, ExecutorService clusteringExecutorService) {
        return CompletableFuture.supplyAsync(() -> {
            JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(this.getEngineContext());
            Schema tableSchemaWithMetaFields = null;
            try {
                tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(new TableSchemaResolver(this.getHoodieTable().getMetaClient()).getTableAvroSchema(false), this.getWriteConfig().allowOperationMetadataField());
            }
            catch (Exception e) {
                throw new HoodieException("Failed to get table schema during clustering", e);
            }
            Dataset<Row> inputRecords = this.readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime, tableSchemaWithMetaFields);
            List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream().map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())).collect(Collectors.toList());
            return this.performClusteringWithRecordsAsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, tableSchemaWithMetaFields, inputFileIds, shouldPreserveHoodieMetadata, clusteringGroup.getExtraMetadata());
        }, clusteringExecutorService);
    }

    private HoodieData<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
        List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
        boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
        if (hasLogFiles) {
            return this.readRecordsForGroupWithLogs(jsc, clusteringOps, instantTime);
        }
        return this.readRecordsForGroupBaseFiles(jsc, clusteringOps);
    }

    private HoodieData<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext jsc, List<ClusteringOperation> clusteringOps, String instantTime) {
        int readParallelism = Math.min(this.writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size());
        return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism).mapPartitions((FlatMapFunction & Serializable)clusteringOpsPartition -> {
            ArrayList suppliers = new ArrayList();
            long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), this.getWriteConfig());
            LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
            Option<BaseKeyGenerator> keyGeneratorOpt = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(this.writeConfig);
            clusteringOpsPartition.forEachRemaining(clusteringOp -> {
                Supplier<ClosableIterator> iteratorSupplier = () -> {
                    Option<HoodieFileReader> baseOrBootstrapFileReader = this.getBaseOrBootstrapFileReader((ClusteringOperation)clusteringOp);
                    return this.getRecordIteratorWithLogFiles((ClusteringOperation)clusteringOp, instantTime, maxMemoryPerCompaction, keyGeneratorOpt, baseOrBootstrapFileReader);
                };
                suppliers.add(iteratorSupplier);
            });
            return new LazyConcatenatingIterator(suppliers);
        }));
    }

    private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc, List<ClusteringOperation> clusteringOps) {
        int readParallelism = Math.min(this.writeConfig.getClusteringGroupReadParallelism(), clusteringOps.size());
        Option<BaseKeyGenerator> keyGeneratorOpt = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(this.writeConfig);
        return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, readParallelism).mapPartitions((FlatMapFunction & Serializable)clusteringOpsPartition -> {
            ArrayList iteratorGettersForPartition = new ArrayList();
            clusteringOpsPartition.forEachRemaining(clusteringOp -> {
                Option<HoodieFileReader> baseOrBootstrapFileReader = this.getBaseOrBootstrapFileReader((ClusteringOperation)clusteringOp);
                ValidationUtils.checkArgument(baseOrBootstrapFileReader.isPresent(), "Base file reader must be present for clustering operation");
                Supplier<ClosableIterator> recordIteratorGetter = () -> this.getRecordIteratorWithBaseFileOnly(keyGeneratorOpt, (HoodieFileReader)baseOrBootstrapFileReader.get());
                iteratorGettersForPartition.add(recordIteratorGetter);
            });
            return new LazyConcatenatingIterator(iteratorGettersForPartition);
        }));
    }

    private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime, Schema tableSchemaWithMetaFields) {
        boolean canUseFileGroupReaderBasedClustering;
        List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
        boolean bl = canUseFileGroupReaderBasedClustering = this.getWriteConfig().getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED) && this.getWriteConfig().getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS) && clusteringOps.stream().allMatch(slice -> StringUtils.isNullOrEmpty(slice.getBootstrapFilePath()));
        if (canUseFileGroupReaderBasedClustering) {
            return this.readRecordsForGroupAsRowWithFileGroupReader(jsc, instantTime, tableSchemaWithMetaFields, clusteringOps);
        }
        return this.readRecordsForGroupAsRow(jsc, clusteringOps);
    }

    private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc, List<ClusteringOperation> clusteringOps) {
        StoragePath[] paths;
        boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> !op.getDeltaFilePaths().isEmpty());
        SQLContext sqlContext = new SQLContext(jsc.sc());
        StoragePath[] baseFilePaths = (StoragePath[])clusteringOps.stream().map(op -> {
            ArrayList<String> readPaths = new ArrayList<String>();
            if (op.getDataFilePath() != null) {
                readPaths.add(op.getDataFilePath());
            }
            return readPaths;
        }).flatMap(Collection::stream).filter(path -> !path.isEmpty()).map(StoragePath::new).toArray(StoragePath[]::new);
        HashMap<String, String> params = new HashMap<String, String>();
        if (hasLogFiles) {
            params.put("hoodie.datasource.query.type", "snapshot");
        } else {
            params.put("hoodie.datasource.query.type", "read_optimized");
        }
        if (hasLogFiles) {
            String rawFractionConfig = this.getWriteConfig().getString(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION);
            String compactionFractor = rawFractionConfig != null ? rawFractionConfig : "0.75";
            params.put(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION.key(), compactionFractor);
            StoragePath[] deltaPaths = (StoragePath[])clusteringOps.stream().filter(op -> !op.getDeltaFilePaths().isEmpty()).flatMap(op -> op.getDeltaFilePaths().stream()).map(StoragePath::new).toArray(StoragePath[]::new);
            paths = CollectionUtils.combine(baseFilePaths, deltaPaths);
        } else {
            paths = baseFilePaths;
        }
        String readPathString = String.join((CharSequence)",", (CharSequence[])Arrays.stream(paths).map(StoragePath::toString).toArray(String[]::new));
        String globPathString = String.join((CharSequence)",", (CharSequence[])Arrays.stream(paths).map(StoragePath::getParent).map(StoragePath::toString).distinct().toArray(String[]::new));
        params.put("hoodie.datasource.read.paths", readPathString);
        params.put("glob.paths", globPathString);
        BaseRelation relation = SparkAdapterSupport$.MODULE$.sparkAdapter().createRelation(sqlContext, this.getHoodieTable().getMetaClient(), null, paths, params);
        return sqlContext.baseRelationToDataFrame(relation);
    }

    private Dataset<Row> readRecordsForGroupAsRowWithFileGroupReader(JavaSparkContext jsc, final String instantTime, Schema tableSchemaWithMetaFields, List<ClusteringOperation> clusteringOps) {
        final String basePath = this.getWriteConfig().getBasePath();
        final boolean usePosition = this.getWriteConfig().getBooleanOrDefault(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS);
        final String internalSchemaStr = this.getWriteConfig().getInternalSchema();
        final boolean isInternalSchemaPresent = !StringUtils.isNullOrEmpty(internalSchemaStr);
        final SerializableSchema serializableTableSchemaWithMetaFields = new SerializableSchema(tableSchemaWithMetaFields);
        final SparkBroadcastManager broadcastManager = new SparkBroadcastManager(this.getEngineContext(), this.getHoodieTable().getMetaClient());
        broadcastManager.prepareAndBroadcast();
        StructType sparkSchemaWithMetaFields = AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields);
        RDD internalRowRDD = jsc.parallelize(clusteringOps, clusteringOps.size()).flatMap((FlatMapFunction)new FlatMapFunction<ClusteringOperation, InternalRow>(){

            public Iterator<InternalRow> call(ClusteringOperation clusteringOperation) throws Exception {
                FileSlice fileSlice = MultipleSparkJobExecutionStrategy.this.clusteringOperation2FileSlice(basePath, clusteringOperation);
                Schema readerSchema = serializableTableSchemaWithMetaFields.get();
                Option<InternalSchema> internalSchemaOption = Option.empty();
                if (isInternalSchemaPresent) {
                    internalSchemaOption = SerDeHelper.fromJson(internalSchemaStr);
                }
                Option<HoodieReaderContext> readerContextOpt = broadcastManager.retrieveFileGroupReaderContext(new StoragePath(basePath));
                Configuration conf = broadcastManager.retrieveStorageConfig().get();
                HoodieFileGroupReader fileGroupReader = new HoodieFileGroupReader(readerContextOpt.get(), MultipleSparkJobExecutionStrategy.this.getHoodieTable().getMetaClient().getStorage().newInstance(new StoragePath(basePath), new HadoopStorageConfiguration(conf)), basePath, instantTime, fileSlice, readerSchema, readerSchema, internalSchemaOption, MultipleSparkJobExecutionStrategy.this.getHoodieTable().getMetaClient(), MultipleSparkJobExecutionStrategy.this.getHoodieTable().getMetaClient().getTableConfig().getProps(), 0L, Long.MAX_VALUE, usePosition);
                fileGroupReader.initRecordIterators();
                HoodieFileGroupReader.HoodieFileGroupReaderIterator<InternalRow> recordIterator = fileGroupReader.getClosableIterator();
                return recordIterator;
            }
        }).rdd();
        return HoodieUnsafeUtils.createDataFrameFromRDD(((HoodieSparkEngineContext)this.getEngineContext()).getSqlContext().sparkSession(), (RDD<InternalRow>)internalRowRDD, sparkSchemaWithMetaFields);
    }

    private FileSlice clusteringOperation2FileSlice(String basePath, ClusteringOperation clusteringOperation) {
        String partitionPath = clusteringOperation.getPartitionPath();
        boolean baseFileExists = !StringUtils.isNullOrEmpty(clusteringOperation.getDataFilePath());
        HoodieBaseFile baseFile = baseFileExists ? new HoodieBaseFile(new StoragePath(basePath, clusteringOperation.getDataFilePath()).toString()) : null;
        List<HoodieLogFile> logFiles = clusteringOperation.getDeltaFilePaths().stream().map(p -> new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(basePath, partitionPath), (String)p))).sorted(new HoodieLogFile.LogFileComparator()).collect(Collectors.toList());
        ValidationUtils.checkState(baseFileExists || !logFiles.isEmpty(), "Both base file and log files are missing from this clustering operation " + clusteringOperation);
        String baseInstantTime = baseFileExists ? baseFile.getCommitTime() : ((HoodieLogFile)logFiles.get(0)).getDeltaCommitTime();
        FileSlice fileSlice = new FileSlice(partitionPath, baseInstantTime, clusteringOperation.getFileId());
        fileSlice.setBaseFile(baseFile);
        logFiles.forEach(fileSlice::addLogFile);
        return fileSlice;
    }

    private JavaRDD<WriteStatus>[] convertStreamToArray(Stream<JavaRDD<WriteStatus>> writeStatusRDDStream) {
        A[] writeStatusObjects = writeStatusRDDStream.toArray(Object[]::new);
        JavaRDD[] writeStatusRDDArray = new JavaRDD[writeStatusObjects.length];
        for (int i = 0; i < writeStatusObjects.length; ++i) {
            writeStatusRDDArray[i] = (JavaRDD)writeStatusObjects[i];
        }
        return writeStatusRDDArray;
    }

    protected Option<HoodieFileReader> getBaseOrBootstrapFileReader(ClusteringOperation clusteringOp) {
        HoodieStorage storage = this.getHoodieTable().getStorage();
        StorageConfiguration<?> storageConf = this.getHoodieTable().getStorageConf();
        HoodieTableConfig tableConfig = this.getHoodieTable().getMetaClient().getTableConfig();
        String bootstrapBasePath = tableConfig.getBootstrapBasePath().orElse(null);
        Option<String[]> partitionFields = tableConfig.getPartitionFields();
        Option<HoodieFileReader> baseFileReaderOpt = ClusteringUtils.getBaseFileReader(storage, this.recordType, this.writeConfig, clusteringOp.getDataFilePath());
        if (baseFileReaderOpt.isEmpty()) {
            return Option.empty();
        }
        try {
            HoodieFileReader baseFileReader = baseFileReaderOpt.get();
            if (StringUtils.nonEmpty(clusteringOp.getBootstrapFilePath()) && StringUtils.nonEmpty(bootstrapBasePath)) {
                String bootstrapFilePath = clusteringOp.getBootstrapFilePath();
                Object[] partitionValues = new Object[]{};
                if (partitionFields.isPresent()) {
                    int startOfPartitionPath = bootstrapFilePath.indexOf(bootstrapBasePath) + bootstrapBasePath.length() + 1;
                    String partitionFilePath = bootstrapFilePath.substring(startOfPartitionPath, bootstrapFilePath.lastIndexOf("/"));
                    partitionValues = SparkPartitionUtils.getPartitionFieldVals(partitionFields, partitionFilePath, bootstrapBasePath, baseFileReader.getSchema(), storageConf.unwrapAs(Configuration.class));
                }
                return Option.of(HoodieSparkIOFactory.getHoodieSparkIOFactory(storage).getReaderFactory(this.recordType).newBootstrapFileReader(baseFileReader, HoodieSparkIOFactory.getHoodieSparkIOFactory(storage).getReaderFactory(this.recordType).getFileReader(this.writeConfig, new StoragePath(bootstrapFilePath)), partitionFields, partitionValues));
            }
            return baseFileReaderOpt;
        }
        catch (IOException e) {
            throw new HoodieClusteringException("Error reading base file", e);
        }
    }
}

