/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.connectors.flink;

import io.pravega.connectors.flink.FlinkPravegaOutputFormat;
import io.pravega.connectors.flink.FlinkPravegaWriter;
import io.pravega.connectors.flink.PravegaEventRouter;
import java.util.Arrays;
import java.util.function.Function;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.BatchTableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

@Deprecated
public class FlinkPravegaTableSink
implements AppendStreamTableSink<Row>,
BatchTableSink<Row> {
    protected final Function<TableSchema, FlinkPravegaWriter<Row>> writerFactory;
    protected final Function<TableSchema, FlinkPravegaOutputFormat<Row>> outputFormatFactory;
    protected TableSchema schema;

    protected FlinkPravegaTableSink(Function<TableSchema, FlinkPravegaWriter<Row>> writerFactory, Function<TableSchema, FlinkPravegaOutputFormat<Row>> outputFormatFactory, TableSchema schema) {
        this.writerFactory = (Function)Preconditions.checkNotNull(writerFactory, (String)"writerFactory");
        this.outputFormatFactory = (Function)Preconditions.checkNotNull(outputFormatFactory, (String)"outputFormatFactory");
        this.schema = TableSchemaUtils.checkNoGeneratedColumns((TableSchema)schema);
    }

    private FlinkPravegaTableSink createCopy() {
        return new FlinkPravegaTableSink(this.writerFactory, this.outputFormatFactory, this.schema);
    }

    public void emitDataStream(DataStream<Row> dataStream) {
        this.consumeDataStream(dataStream);
    }

    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        Preconditions.checkState((this.schema != null ? 1 : 0) != 0, (Object)"Table sink is not configured");
        FlinkPravegaWriter<Row> writer = this.writerFactory.apply(this.schema);
        return dataStream.addSink(writer).setParallelism(dataStream.getParallelism()).name(TableConnectorUtils.generateRuntimeName(this.getClass(), (String[])this.getFieldNames()));
    }

    public void emitDataSet(DataSet<Row> dataSet) {
        Preconditions.checkState((this.schema != null ? 1 : 0) != 0, (Object)"Table sink is not configured");
        FlinkPravegaOutputFormat<Row> outputFormat = this.outputFormatFactory.apply(this.schema);
        dataSet.output(outputFormat);
    }

    public DataType getConsumedDataType() {
        return this.schema.toRowDataType();
    }

    public TableSchema getTableSchema() {
        return this.schema;
    }

    public FlinkPravegaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
        Preconditions.checkNotNull((Object)fieldNames, (String)"fieldNames");
        Preconditions.checkNotNull(fieldTypes, (String)"fieldTypes");
        Preconditions.checkArgument((fieldNames.length == fieldTypes.length ? 1 : 0) != 0, (Object)"Number of provided field names and types does not match.");
        FlinkPravegaTableSink copy = this.createCopy();
        DataType[] dataTypes = (DataType[])Arrays.stream(fieldTypes).map(TypeConversions::fromLegacyInfoToDataType).toArray(DataType[]::new);
        copy.schema = TableSchema.builder().fields(fieldNames, dataTypes).build();
        return copy;
    }

    public static class RowBasedRouter
    implements PravegaEventRouter<Row> {
        private final int keyIndex;

        public RowBasedRouter(String keyFieldName, String[] fieldNames, DataType[] fieldTypes) {
            Preconditions.checkArgument((fieldNames.length == fieldTypes.length ? 1 : 0) != 0, (Object)"Number of provided field names and types does not match.");
            int keyIndex = Arrays.asList(fieldNames).indexOf(keyFieldName);
            Preconditions.checkArgument((keyIndex >= 0 ? 1 : 0) != 0, (Object)("Key field '" + keyFieldName + "' not found"));
            Preconditions.checkArgument((boolean)DataTypes.STRING().equals((Object)fieldTypes[keyIndex]), (Object)"Key field must be of type 'STRING'");
            this.keyIndex = keyIndex;
        }

        @Override
        public String getRoutingKey(Row event) {
            return (String)event.getField(this.keyIndex);
        }

        int getKeyIndex() {
            return this.keyIndex;
        }
    }
}

