/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.connectors.flink;

import io.pravega.connectors.flink.FlinkPravegaInputFormat;
import io.pravega.connectors.flink.FlinkPravegaReader;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sources.BatchTableSource;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

@Deprecated
public class FlinkPravegaTableSource
implements StreamTableSource<Row>,
BatchTableSource<Row>,
DefinedProctimeAttribute,
DefinedRowtimeAttributes {
    private final Supplier<FlinkPravegaReader<Row>> sourceFunctionFactory;
    private final Supplier<FlinkPravegaInputFormat<Row>> inputFormatFactory;
    private final TableSchema schema;
    private String proctimeAttribute;
    private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;

    protected FlinkPravegaTableSource(Supplier<FlinkPravegaReader<Row>> sourceFunctionFactory, Supplier<FlinkPravegaInputFormat<Row>> inputFormatFactory, TableSchema schema) {
        this.sourceFunctionFactory = (Supplier)Preconditions.checkNotNull(sourceFunctionFactory, (String)"sourceFunctionFactory");
        this.inputFormatFactory = (Supplier)Preconditions.checkNotNull(inputFormatFactory, (String)"inputFormatFactory");
        this.schema = TableSchemaUtils.checkNoGeneratedColumns((TableSchema)schema);
    }

    public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
        FlinkPravegaReader<Row> reader = this.sourceFunctionFactory.get();
        reader.initialize();
        return env.addSource(reader).name(this.explainSource());
    }

    public DataSet<Row> getDataSet(ExecutionEnvironment env) {
        FlinkPravegaInputFormat<Row> inputFormat = this.inputFormatFactory.get();
        return env.createInput(inputFormat, this.getProducedTypeInformation()).name(this.explainSource());
    }

    public DataType getProducedDataType() {
        return this.schema.toRowDataType();
    }

    private TypeInformation<Row> getProducedTypeInformation() {
        return TypeConversions.fromDataTypeToLegacyInfo((DataType)this.getProducedDataType());
    }

    public TableSchema getTableSchema() {
        return this.schema;
    }

    public String getProctimeAttribute() {
        return this.proctimeAttribute;
    }

    public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
        return this.rowtimeAttributeDescriptors;
    }

    protected void setProctimeAttribute(String proctimeAttribute) {
        if (proctimeAttribute != null) {
            Optional tpe = this.schema.getFieldDataType(proctimeAttribute);
            if (!tpe.isPresent()) {
                throw new ValidationException("Processing time attribute " + proctimeAttribute + " is not present in TableSchema.");
            }
            if (((DataType)tpe.get()).getLogicalType().getTypeRoot() != LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
                throw new ValidationException("Processing time attribute " + proctimeAttribute + " is not of type TIMESTAMP.");
            }
        }
        this.proctimeAttribute = proctimeAttribute;
    }

    protected void setRowtimeAttributeDescriptors(List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors) {
        for (RowtimeAttributeDescriptor desc : rowtimeAttributeDescriptors) {
            String rowtimeAttribute = desc.getAttributeName();
            Optional tpe = this.schema.getFieldDataType(rowtimeAttribute);
            if (!tpe.isPresent()) {
                throw new ValidationException("Rowtime attribute " + rowtimeAttribute + " is not present in TableSchema.");
            }
            if (((DataType)tpe.get()).getLogicalType().getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) continue;
            throw new ValidationException("Rowtime attribute " + rowtimeAttribute + " is not of type TIMESTAMP.");
        }
        this.rowtimeAttributeDescriptors = rowtimeAttributeDescriptors;
    }

    public String explainSource() {
        return TableConnectorUtils.generateRuntimeName(this.getClass(), (String[])this.schema.getFieldNames());
    }
}

