/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.source.FileIndex;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.InputFormats;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieTableSource
implements ScanTableSource,
SupportsPartitionPushDown,
SupportsProjectionPushDown,
SupportsLimitPushDown,
SupportsFilterPushDown {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieTableSource.class);
    private static final int NO_LIMIT_CONSTANT = -1;
    private final transient org.apache.hadoop.conf.Configuration hadoopConf;
    private final transient HoodieTableMetaClient metaClient;
    private final long maxCompactionMemoryInBytes;
    private final ResolvedSchema schema;
    private final org.apache.hadoop.fs.Path path;
    private final List<String> partitionKeys;
    private final String defaultPartName;
    private final Configuration conf;
    private final FileIndex fileIndex;
    private int[] requiredPos;
    private long limit;
    private List<Expression> filters;
    private List<Map<String, String>> requiredPartitions;

    public HoodieTableSource(ResolvedSchema schema, org.apache.hadoop.fs.Path path, List<String> partitionKeys, String defaultPartName, Configuration conf) {
        this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null);
    }

    public HoodieTableSource(ResolvedSchema schema, org.apache.hadoop.fs.Path path, List<String> partitionKeys, String defaultPartName, Configuration conf, @Nullable List<Map<String, String>> requiredPartitions, @Nullable int[] requiredPos, @Nullable Long limit, @Nullable List<Expression> filters) {
        this.schema = schema;
        this.path = path;
        this.partitionKeys = partitionKeys;
        this.defaultPartName = defaultPartName;
        this.conf = conf;
        this.fileIndex = FileIndex.instance(this.path, this.conf);
        this.requiredPartitions = requiredPartitions;
        this.requiredPos = requiredPos == null ? IntStream.range(0, schema.getColumnCount()).toArray() : requiredPos;
        this.limit = limit == null ? -1L : limit;
        this.filters = filters == null ? Collections.emptyList() : filters;
        this.hadoopConf = StreamerUtil.getHadoopConf();
        this.metaClient = StreamerUtil.metaClientForReader(conf, this.hadoopConf);
        this.maxCompactionMemoryInBytes = HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes((JobConf)new JobConf(this.hadoopConf));
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        return new DataStreamScanProvider(){

            public boolean isBounded() {
                return !HoodieTableSource.this.conf.getBoolean(FlinkOptions.READ_AS_STREAMING);
            }

            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
                TypeInformation typeInfo = TypeInfoDataTypeConverter.fromDataTypeToTypeInfo((DataType)HoodieTableSource.this.getProducedDataType());
                if (HoodieTableSource.this.conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
                    StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(HoodieTableSource.this.conf, FilePathUtils.toFlinkPath(HoodieTableSource.this.path), HoodieTableSource.this.maxCompactionMemoryInBytes, HoodieTableSource.this.getRequiredPartitionPaths());
                    InputFormat<RowData, ?> inputFormat = HoodieTableSource.this.getInputFormat(true);
                    OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat)inputFormat);
                    SingleOutputStreamOperator source = execEnv.addSource((SourceFunction)monitoringFunction, HoodieTableSource.this.getSourceOperatorName("split_monitor")).setParallelism(1).transform("split_reader", typeInfo, factory).setParallelism(HoodieTableSource.this.conf.getInteger(FlinkOptions.READ_TASKS));
                    return new DataStreamSource(source);
                }
                InputFormatSourceFunction func = new InputFormatSourceFunction(HoodieTableSource.this.getInputFormat(), typeInfo);
                DataStreamSource source = execEnv.addSource((SourceFunction)func, HoodieTableSource.this.asSummaryString(), typeInfo);
                return source.name(HoodieTableSource.this.getSourceOperatorName("bounded_source")).setParallelism(HoodieTableSource.this.conf.getInteger(FlinkOptions.READ_TASKS));
            }
        };
    }

    public ChangelogMode getChangelogMode() {
        return OptionsResolver.emitChangelog(this.conf) ? ChangelogModes.FULL : ChangelogMode.insertOnly();
    }

    public DynamicTableSource copy() {
        return new HoodieTableSource(this.schema, this.path, this.partitionKeys, this.defaultPartName, this.conf, this.requiredPartitions, this.requiredPos, this.limit, this.filters);
    }

    public String asSummaryString() {
        return "HudiTableSource";
    }

    public SupportsFilterPushDown.Result applyFilters(List<ResolvedExpression> filters) {
        this.filters = new ArrayList<ResolvedExpression>(filters);
        return SupportsFilterPushDown.Result.of(Collections.emptyList(), new ArrayList<ResolvedExpression>(filters));
    }

    public Optional<List<Map<String, String>>> listPartitions() {
        List<Map<String, String>> partitions = this.fileIndex.getPartitions(this.partitionKeys, this.defaultPartName, this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
        return Optional.of(partitions);
    }

    public void applyPartitions(List<Map<String, String>> partitions) {
        this.requiredPartitions = partitions;
    }

    public boolean supportsNestedProjection() {
        return false;
    }

    public void applyProjection(int[][] projections) {
        this.requiredPos = Arrays.stream(projections).mapToInt(array -> array[0]).toArray();
    }

    public void applyLimit(long limit) {
        this.limit = limit;
    }

    private DataType getProducedDataType() {
        String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]);
        DataType[] schemaTypes = this.schema.getColumnDataTypes().toArray(new DataType[0]);
        return (DataType)DataTypes.ROW((DataTypes.Field[])((DataTypes.Field[])Arrays.stream(this.requiredPos).mapToObj(i -> DataTypes.FIELD((String)schemaFieldNames[i], (DataType)schemaTypes[i])).toArray(DataTypes.Field[]::new))).bridgedTo(RowData.class);
    }

    private List<Map<String, String>> getOrFetchPartitions() {
        if (this.requiredPartitions == null) {
            this.requiredPartitions = this.listPartitions().orElse(Collections.emptyList());
        }
        return this.requiredPartitions;
    }

    private String getSourceOperatorName(String operatorName) {
        String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]);
        List fields = Arrays.stream(this.requiredPos).mapToObj(i -> schemaFieldNames[i]).collect(Collectors.toList());
        StringBuilder sb = new StringBuilder();
        sb.append(operatorName).append("(").append("table=").append(Collections.singletonList(this.conf.getString(FlinkOptions.TABLE_NAME))).append(", ").append("fields=").append(fields).append(")");
        return sb.toString();
    }

    @Nullable
    private Set<String> getRequiredPartitionPaths() {
        if (this.requiredPartitions == null) {
            return null;
        }
        return FilePathUtils.toRelativePartitionPaths(this.partitionKeys, this.requiredPartitions, this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
    }

    private List<MergeOnReadInputSplit> buildFileIndex() {
        Set<String> requiredPartitionPaths = this.getRequiredPartitionPaths();
        this.fileIndex.setPartitionPaths(requiredPartitionPaths);
        List<String> relPartitionPaths = this.fileIndex.getOrBuildPartitionPaths();
        if (relPartitionPaths.size() == 0) {
            return Collections.emptyList();
        }
        FileStatus[] fileStatuses = this.fileIndex.getFilesInPartitions();
        if (fileStatuses.length == 0) {
            throw new HoodieException("No files found for reading in user provided path.");
        }
        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(), fileStatuses);
        String latestCommit = ((HoodieInstant)fsView.getLastInstant().get()).getTimestamp();
        String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
        AtomicInteger cnt = new AtomicInteger(0);
        return relPartitionPaths.stream().map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit).map(fileSlice -> {
            String basePath = (String)fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
            Option logPaths = Option.ofNullable(fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()));
            return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, (Option<List<String>>)logPaths, latestCommit, this.metaClient.getBasePath(), this.maxCompactionMemoryInBytes, mergeType, null);
        }).collect(Collectors.toList())).flatMap(Collection::stream).collect(Collectors.toList());
    }

    public InputFormat<RowData, ?> getInputFormat() {
        return this.getInputFormat(false);
    }

    @VisibleForTesting
    public InputFormat<RowData, ?> getInputFormat(boolean isStreaming) {
        return isStreaming ? this.getStreamInputFormat() : this.getBatchInputFormat();
    }

    private InputFormat<RowData, ?> getBatchInputFormat() {
        String queryType;
        Schema tableAvroSchema = this.getTableAvroSchema();
        DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
        RowType rowType = (RowType)rowDataType.getLogicalType();
        RowType requiredRowType = (RowType)((DataType)this.getProducedDataType().notNull()).getLogicalType();
        switch (queryType = this.conf.getString(FlinkOptions.QUERY_TYPE)) {
            case "snapshot": {
                HoodieTableType tableType = HoodieTableType.valueOf((String)this.conf.getString(FlinkOptions.TABLE_TYPE));
                switch (tableType) {
                    case MERGE_ON_READ: {
                        List<MergeOnReadInputSplit> inputSplits = this.buildFileIndex();
                        if (inputSplits.size() == 0) {
                            LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
                            return InputFormats.EMPTY_INPUT_FORMAT;
                        }
                        return this.mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, inputSplits, false);
                    }
                    case COPY_ON_WRITE: {
                        return this.baseFileOnlyInputFormat();
                    }
                }
                throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE));
            }
            case "read_optimized": {
                return this.baseFileOnlyInputFormat();
            }
            case "incremental": {
                IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder().conf(this.conf).path(FilePathUtils.toFlinkPath(this.path)).maxCompactionMemoryInBytes(this.maxCompactionMemoryInBytes).requiredPartitions(this.getRequiredPartitionPaths()).build();
                IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(this.metaClient, this.hadoopConf);
                if (result.isEmpty()) {
                    LOG.warn("No input splits generate for incremental read, returns empty collection instead");
                    return InputFormats.EMPTY_INPUT_FORMAT;
                }
                return this.mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, result.getInputSplits(), false);
            }
        }
        String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s', '%s'] are supported now", queryType, "snapshot", "read_optimized", "incremental");
        throw new HoodieException(errMsg);
    }

    private InputFormat<RowData, ?> getStreamInputFormat() {
        Schema tableAvroSchema = this.metaClient == null ? this.inferSchemaFromDdl() : this.getTableAvroSchema();
        DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
        RowType rowType = (RowType)rowDataType.getLogicalType();
        RowType requiredRowType = (RowType)((DataType)this.getProducedDataType().notNull()).getLogicalType();
        String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE);
        if ("snapshot".equals(queryType)) {
            HoodieTableType tableType = HoodieTableType.valueOf((String)this.conf.getString(FlinkOptions.TABLE_TYPE));
            boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
            return this.mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, Collections.emptyList(), emitDelete);
        }
        String errMsg = String.format("Invalid query type : '%s', options ['%s'] are supported now", queryType, "snapshot");
        throw new HoodieException(errMsg);
    }

    private MergeOnReadInputFormat mergeOnReadInputFormat(RowType rowType, RowType requiredRowType, Schema tableAvroSchema, DataType rowDataType, List<MergeOnReadInputSplit> inputSplits, boolean emitDelete) {
        MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(rowType, requiredRowType, tableAvroSchema.toString(), AvroSchemaConverter.convertToSchema((LogicalType)requiredRowType).toString(), inputSplits, this.conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
        return MergeOnReadInputFormat.builder().config(this.conf).tableState(hoodieTableState).fieldTypes(rowDataType.getChildren()).defaultPartName(this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)).limit(this.limit).emitDelete(emitDelete).build();
    }

    private InputFormat<RowData, ?> baseFileOnlyInputFormat() {
        org.apache.hadoop.fs.Path[] paths = this.getReadPaths();
        if (paths.length == 0) {
            return InputFormats.EMPTY_INPUT_FORMAT;
        }
        CopyOnWriteInputFormat format = new CopyOnWriteInputFormat(FilePathUtils.toFlinkPaths(paths), this.schema.getColumnNames().toArray(new String[0]), this.schema.getColumnDataTypes().toArray(new DataType[0]), this.requiredPos, this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), this.limit == -1L ? Long.MAX_VALUE : this.limit, FormatUtils.getParquetConf(this.conf, this.hadoopConf), this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE));
        format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
        return format;
    }

    private Schema inferSchemaFromDdl() {
        Schema schema = AvroSchemaConverter.convertToSchema(this.schema.toPhysicalRowDataType().getLogicalType());
        return HoodieAvroUtils.addMetadataFields((Schema)schema, (boolean)this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
    }

    @VisibleForTesting
    public Schema getTableAvroSchema() {
        try {
            TableSchemaResolver schemaUtil = new TableSchemaResolver(this.metaClient, this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
            return schemaUtil.getTableAvroSchema();
        }
        catch (Throwable e) {
            LOG.warn("Get table avro schema error, use schema from the DDL instead", e);
            return this.inferSchemaFromDdl();
        }
    }

    @VisibleForTesting
    public HoodieTableMetaClient getMetaClient() {
        return this.metaClient;
    }

    @VisibleForTesting
    public Configuration getConf() {
        return this.conf;
    }

    @VisibleForTesting
    public void reset() {
        this.metaClient.reloadActiveTimeline();
        this.requiredPartitions = null;
        this.fileIndex.reset();
    }

    @VisibleForTesting
    public org.apache.hadoop.fs.Path[] getReadPaths() {
        org.apache.hadoop.fs.Path[] pathArray;
        if (this.partitionKeys.isEmpty()) {
            org.apache.hadoop.fs.Path[] pathArray2 = new org.apache.hadoop.fs.Path[1];
            pathArray = pathArray2;
            pathArray2[0] = this.path;
        } else {
            pathArray = FilePathUtils.partitionPath2ReadPath(this.path, this.partitionKeys, this.getOrFetchPartitions(), this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
        }
        return pathArray;
    }

    private static class LatestFileFilter
    extends FilePathFilter {
        private final HoodieROTablePathFilter hoodieFilter;

        public LatestFileFilter(org.apache.hadoop.conf.Configuration hadoopConf) {
            this.hoodieFilter = new HoodieROTablePathFilter(hadoopConf);
        }

        public boolean filterPath(Path filePath) {
            return !this.hoodieFilter.accept(new org.apache.hadoop.fs.Path(filePath.toUri()));
        }
    }
}

