/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.common.table;

import com.alibaba.ververica.connectors.common.source.AbstractParallelSource;
import com.alibaba.ververica.connectors.common.source.resolver.ConverterOp;
import com.alibaba.ververica.connectors.common.source.resolver.RecordResolver;
import com.alibaba.ververica.connectors.common.table.VervericaTableOptions;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractVervericaTableSource<IN, OUT>
implements StreamTableSource<OUT> {
    public static final Logger LOGGER = LoggerFactory.getLogger(AbstractVervericaTableSource.class);
    protected DescriptorProperties properties;

    public AbstractVervericaTableSource(DescriptorProperties properties) {
        this.properties = properties;
    }

    public TableSchema getTableSchema() {
        return TableSchemaUtils.getPhysicalSchema((TableSchema)this.properties.getTableSchema("schema"));
    }

    public DataType getProducedDataType() {
        TableSchema tableSchema = this.getTableSchema();
        DataTypes.Field[] fields = new DataTypes.Field[tableSchema.getFieldDataTypes().length];
        DataType[] dataTypes = tableSchema.getFieldDataTypes();
        String[] filedNames = tableSchema.getFieldNames();
        for (int i = 0; i < filedNames.length; ++i) {
            fields[i] = dataTypes[i].getLogicalType().getTypeRoot().equals((Object)LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) ? DataTypes.FIELD((String)filedNames[i], (DataType)((DataType)dataTypes[i].bridgedTo(Timestamp.class))) : (dataTypes[i].equals((Object)DataTypes.DATE()) ? DataTypes.FIELD((String)filedNames[i], (DataType)((DataType)dataTypes[i].bridgedTo(Date.class))) : DataTypes.FIELD((String)filedNames[i], (DataType)dataTypes[i]));
        }
        return (DataType)DataTypes.ROW((DataTypes.Field[])fields).bridgedTo(this.getOutputClass());
    }

    public DataStream<OUT> getDataStream(StreamExecutionEnvironment env) {
        DataStreamSource sourceStream;
        RecordResolver<IN, OUT> converter;
        block4: {
            converter = this.createRecordConverter();
            SourceFunction<IN> sourceFunction = this.createSourceFunction();
            sourceStream = env.addSource(sourceFunction);
            int parallelism = env.getParallelism();
            if (sourceFunction instanceof AbstractParallelSource) {
                AbstractParallelSource parallelSourceFunction = (AbstractParallelSource)sourceFunction;
                try {
                    List<String> partitionList = parallelSourceFunction.getPartitionList();
                    Preconditions.checkArgument((partitionList != null && partitionList.size() > 0 ? 1 : 0) != 0, (Object)"No partition found.");
                    int partitionCount = partitionList.size();
                    if (parallelism > partitionCount) {
                        parallelism = partitionCount;
                    }
                    sourceStream.setParallelism(parallelism);
                }
                catch (Exception e) {
                    LOGGER.error("Fail to get partition list from source function");
                    if (this.properties.getOptionalBoolean(VervericaTableOptions.IGNORE_PARTITION_FETCH_ERROR.key()).orElse((Boolean)VervericaTableOptions.IGNORE_PARTITION_FETCH_ERROR.defaultValue()).booleanValue()) break block4;
                    throw new RuntimeException("Fail to get partition list from source function", e);
                }
            }
        }
        return sourceStream.flatMap(new ConverterOp<IN, OUT>(converter, converter.getProducedType())).setParallelism(env.getParallelism());
    }

    public abstract RecordResolver<IN, OUT> createRecordConverter();

    public abstract SourceFunction<IN> createSourceFunction();

    public abstract boolean isBounded();

    public abstract Class<OUT> getOutputClass();
}

