/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.common;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.SparkFileFormatInternalRowReaderContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.datasources.FileFormat;
import org.apache.spark.sql.execution.datasources.SparkColumnarFileReader;
import org.apache.spark.sql.hudi.MultipleColumnarFileFormatReader;
import org.apache.spark.sql.hudi.SparkAdapter;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.SerializableConfiguration;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.mutable.Buffer;

public class SparkReaderContextFactory
implements ReaderContextFactory<InternalRow> {
    private final Broadcast<SparkColumnarFileReader> baseFileReaderBroadcast;
    private final Broadcast<SerializableConfiguration> configurationBroadcast;
    private final Broadcast<HoodieTableConfig> tableConfigBroadcast;

    public SparkReaderContextFactory(HoodieSparkEngineContext hoodieSparkEngineContext, HoodieTableMetaClient metaClient) {
        this(hoodieSparkEngineContext, metaClient, new TableSchemaResolver(metaClient), SparkAdapterSupport$.MODULE$.sparkAdapter());
    }

    public SparkReaderContextFactory(HoodieSparkEngineContext hoodieSparkEngineContext, HoodieTableMetaClient metaClient, TableSchemaResolver resolver, SparkAdapter sparkAdapter) {
        SQLConf sqlConf = hoodieSparkEngineContext.getSqlContext().sparkSession().sessionState().conf();
        JavaSparkContext jsc = hoodieSparkEngineContext.jsc();
        boolean returningBatch = sqlConf.parquetVectorizedReaderEnabled();
        Map options = Map$.MODULE$.empty().$plus(new Tuple2((Object)FileFormat.OPTION_RETURNING_BATCH(), (Object)Boolean.toString(returningBatch)));
        InstantFileNameGenerator fileNameGenerator = metaClient.getTimelineLayout().getInstantFileNameGenerator();
        HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
        java.util.Map<String, String> schemaEvolutionConfigs = SparkReaderContextFactory.getSchemaEvolutionConfigs(resolver, timeline, fileNameGenerator, metaClient.getBasePath().toString());
        Configuration configs = SparkReaderContextFactory.getHadoopConfiguration(jsc.hadoopConfiguration());
        schemaEvolutionConfigs.forEach((arg_0, arg_1) -> ((Configuration)configs).set(arg_0, arg_1));
        configs.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE().key(), sqlConf.getConfString(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE().key()));
        configs.set(SQLConf.PARQUET_WRITE_LEGACY_FORMAT().key(), sqlConf.getConfString(SQLConf.PARQUET_WRITE_LEGACY_FORMAT().key()));
        this.configurationBroadcast = jsc.broadcast((Object)new SerializableConfiguration(configs));
        if (metaClient.getTableConfig().isMultipleBaseFileFormatsEnabled()) {
            SparkColumnarFileReader parquetFileReader = sparkAdapter.createParquetFileReader(false, sqlConf, (Map<String, String>)options, configs);
            SparkColumnarFileReader orcFileReader = SparkReaderContextFactory.getOrcFileReader(resolver, sqlConf, (Map<String, String>)options, configs, sparkAdapter);
            this.baseFileReaderBroadcast = jsc.broadcast((Object)new MultipleColumnarFileFormatReader(parquetFileReader, orcFileReader));
        } else if (metaClient.getTableConfig().getBaseFileFormat() == HoodieFileFormat.ORC) {
            SparkColumnarFileReader orcFileReader = SparkReaderContextFactory.getOrcFileReader(resolver, sqlConf, (Map<String, String>)options, configs, sparkAdapter);
            this.baseFileReaderBroadcast = jsc.broadcast((Object)orcFileReader);
        } else {
            this.baseFileReaderBroadcast = jsc.broadcast((Object)sparkAdapter.createParquetFileReader(false, sqlConf, (Map<String, String>)options, configs));
        }
        HoodieTableConfig tableConfig = metaClient.getTableConfig();
        this.tableConfigBroadcast = jsc.broadcast((Object)tableConfig);
    }

    public HoodieReaderContext<InternalRow> getContext() {
        if (this.baseFileReaderBroadcast == null) {
            throw new HoodieException("Spark Parquet reader broadcast is not initialized.");
        }
        if (this.configurationBroadcast == null) {
            throw new HoodieException("Configuration broadcast is not initialized.");
        }
        if (this.tableConfigBroadcast == null) {
            throw new HoodieException("Table config broadcast is not initialized.");
        }
        SparkColumnarFileReader baseFileReader = (SparkColumnarFileReader)this.baseFileReaderBroadcast.getValue();
        if (baseFileReader != null) {
            List filters = Collections.emptyList();
            return new SparkFileFormatInternalRowReaderContext(baseFileReader, (Seq<Filter>)((Buffer)JavaConverters.asScalaBufferConverter(filters).asScala()).toSeq(), (Seq<Filter>)((Buffer)JavaConverters.asScalaBufferConverter(filters).asScala()).toSeq(), (StorageConfiguration<?>)new HadoopStorageConfiguration(((SerializableConfiguration)this.configurationBroadcast.getValue()).value()), (HoodieTableConfig)this.tableConfigBroadcast.getValue());
        }
        throw new HoodieException("Cannot get the broadcast Spark Parquet reader.");
    }

    private static SparkColumnarFileReader getOrcFileReader(TableSchemaResolver resolver, SQLConf sqlConf, Map<String, String> options, Configuration configs, SparkAdapter sparkAdapter) {
        try {
            StructType dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableAvroSchema());
            return sparkAdapter.createOrcFileReader(false, sqlConf, options, configs, dataSchema);
        }
        catch (Exception e) {
            throw new HoodieException("Failed to broadcast ORC file reader", (Throwable)e);
        }
    }

    private static Configuration getHadoopConfiguration(Configuration configuration) {
        Configuration hadoopConf = new Configuration(configuration);
        hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(), false);
        hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
        hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), false);
        hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), true);
        hadoopConf.setBoolean("spark.sql.legacy.parquet.nanosAsLong", false);
        if (HoodieSparkUtils.gteqSpark3_4()) {
            hadoopConf.setBoolean("spark.sql.parquet.inferTimestampNTZ.enabled", false);
        }
        return (Configuration)new HadoopStorageConfiguration(hadoopConf).getInline().unwrap();
    }

    private static java.util.Map<String, String> getSchemaEvolutionConfigs(TableSchemaResolver schemaResolver, HoodieTimeline timeline, InstantFileNameGenerator fileNameGenerator, String basePath) {
        Option internalSchemaOpt = schemaResolver.getTableInternalSchemaFromCommitMetadata();
        HashMap<String, String> configs = new HashMap<String, String>();
        if (internalSchemaOpt.isPresent()) {
            List instantFiles = timeline.getInstants().stream().map(arg_0 -> ((InstantFileNameGenerator)fileNameGenerator).getFileName(arg_0)).collect(Collectors.toList());
            configs.put("hoodie.valid.commits.list", String.join((CharSequence)",", instantFiles));
            configs.put("hoodie.tablePath", basePath);
        }
        return configs;
    }
}

