/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.connectors.flink;

import io.pravega.client.stream.StreamCut;
import io.pravega.connectors.flink.FlinkPravegaTableSink;
import io.pravega.connectors.flink.FlinkPravegaTableSource;
import io.pravega.connectors.flink.table.descriptors.Pravega;
import io.pravega.connectors.flink.table.descriptors.PravegaValidator;
import io.pravega.connectors.flink.util.ConnectorConfigurations;
import io.pravega.connectors.flink.util.StreamWithBoundaries;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.factories.DeserializationSchemaFactory;
import org.apache.flink.table.factories.SerializationSchemaFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;

@Deprecated
public abstract class FlinkPravegaTableFactoryBase {
    protected Map<String, String> getRequiredContext() {
        HashMap<String, String> context = new HashMap<String, String>();
        context.put("connector.type", "pravega");
        context.put("connector.property-version", "1");
        context.put("connector.version", this.getVersion());
        return context;
    }

    protected List<String> getSupportedProperties() {
        ArrayList<String> properties = new ArrayList<String>();
        properties.add("connector.metrics");
        properties.add("connector.connection-config");
        properties.add("connector.connection-config.controller-uri");
        properties.add("connector.connection-config.default-scope");
        properties.add("connector.connection-config.security");
        properties.add("connector.connection-config.security.auth-type");
        properties.add("connector.connection-config.security.auth-token");
        properties.add("connector.connection-config.security.validate-hostname");
        properties.add("connector.connection-config.security.trust-store");
        properties.add("connector.reader");
        properties.add("connector.reader.stream-info");
        properties.add("connector.reader.stream-info.#.scope");
        properties.add("connector.reader.stream-info.#.stream");
        properties.add("connector.reader.stream-info.#.start-streamcut");
        properties.add("connector.reader.stream-info.#.end-streamcut");
        properties.add("connector.reader.reader-group");
        properties.add("connector.reader.reader-group.uid");
        properties.add("connector.reader.reader-group.scope");
        properties.add("connector.reader.reader-group.name");
        properties.add("connector.reader.reader-group.refresh-interval");
        properties.add("connector.reader.reader-group.event-read-timeout-interval");
        properties.add("connector.reader.reader-group.checkpoint-initiate-timeout-interval");
        properties.add("connector.reader.user.timestamp-assigner");
        properties.add("connector.writer");
        properties.add("connector.writer.scope");
        properties.add("connector.writer.stream");
        properties.add("connector.writer.mode");
        properties.add("connector.writer.txn-lease-renewal-interval");
        properties.add("connector.writer.enable-watermark");
        properties.add("connector.writer.routingkey-field-name");
        properties.add("schema.#.type");
        properties.add("schema.#.data-type");
        properties.add("schema.#.name");
        properties.add("schema.#.from");
        properties.add("schema.#.expr");
        properties.add("schema.#.proctime");
        properties.add("schema.#.rowtime.timestamps.type");
        properties.add("schema.#.rowtime.timestamps.from");
        properties.add("schema.#.rowtime.timestamps.class");
        properties.add("schema.#.rowtime.timestamps.serialized");
        properties.add("schema.#.rowtime.watermarks.type");
        properties.add("schema.#.rowtime.watermarks.class");
        properties.add("schema.#.rowtime.watermarks.serialized");
        properties.add("schema.#.rowtime.watermarks.delay");
        properties.add("schema.watermark.#.rowtime");
        properties.add("schema.watermark.#.strategy.expr");
        properties.add("schema.watermark.#.strategy.data-type");
        properties.add("format.*");
        return properties;
    }

    protected abstract String getVersion();

    protected abstract boolean isStreamEnvironment();

    protected DescriptorProperties getValidatedProperties(Map<String, String> properties) {
        DescriptorProperties descriptorProperties = new DescriptorProperties(true);
        descriptorProperties.putProperties(properties);
        boolean supportsSourceTimestamps = true;
        boolean supportsSourceWatermarks = true;
        new SchemaValidator(this.isStreamEnvironment(), supportsSourceTimestamps, supportsSourceWatermarks).validate(descriptorProperties);
        new PravegaValidator().validate(descriptorProperties);
        return descriptorProperties;
    }

    protected SerializationSchema<Row> getSerializationSchema(Map<String, String> properties) {
        SerializationSchemaFactory formatFactory = (SerializationSchemaFactory)TableFactoryService.find(SerializationSchemaFactory.class, properties, (ClassLoader)this.getClass().getClassLoader());
        return formatFactory.createSerializationSchema(properties);
    }

    protected DeserializationSchema<Row> getDeserializationSchema(Map<String, String> properties) {
        DeserializationSchemaFactory formatFactory = (DeserializationSchemaFactory)TableFactoryService.find(DeserializationSchemaFactory.class, properties, (ClassLoader)this.getClass().getClassLoader());
        return formatFactory.createDeserializationSchema(properties);
    }

    protected FlinkPravegaTableSource createFlinkPravegaTableSource(Map<String, String> properties) {
        DescriptorProperties descriptorProperties = this.getValidatedProperties(properties);
        TableSchema schema = TableSchemaUtils.getPhysicalSchema((TableSchema)descriptorProperties.getTableSchema("schema"));
        DeserializationSchema<Row> deserializationSchema = this.getDeserializationSchema(properties);
        ConnectorConfigurations connectorConfigurations = new ConnectorConfigurations();
        connectorConfigurations.parseConfigurations(descriptorProperties, ConnectorConfigurations.ConfigurationType.READER);
        Pravega.TableSourceReaderBuilder tableSourceReaderBuilder = new Pravega().tableSourceReaderBuilder();
        tableSourceReaderBuilder.withDeserializationSchema(deserializationSchema);
        if (connectorConfigurations.getAssignerWithTimeWindows().isPresent()) {
            tableSourceReaderBuilder.withTimestampAssigner(connectorConfigurations.getAssignerWithTimeWindows().get());
        }
        if (connectorConfigurations.getUid().isPresent()) {
            tableSourceReaderBuilder.uid(connectorConfigurations.getUid().get());
        }
        if (connectorConfigurations.getRgScope().isPresent()) {
            tableSourceReaderBuilder.withReaderGroupScope(connectorConfigurations.getRgScope().get());
        }
        if (connectorConfigurations.getRgName().isPresent()) {
            tableSourceReaderBuilder.withReaderGroupName(connectorConfigurations.getRgName().get());
        }
        if (connectorConfigurations.getRefreshInterval().isPresent()) {
            tableSourceReaderBuilder.withReaderGroupRefreshTime(Time.milliseconds((long)connectorConfigurations.getRefreshInterval().get()));
        }
        if (connectorConfigurations.getEventReadTimeoutInterval().isPresent()) {
            tableSourceReaderBuilder.withEventReadTimeout(Time.milliseconds((long)connectorConfigurations.getEventReadTimeoutInterval().get()));
        }
        if (connectorConfigurations.getCheckpointInitiateTimeoutInterval().isPresent()) {
            tableSourceReaderBuilder.withCheckpointInitiateTimeout(Time.milliseconds((long)connectorConfigurations.getCheckpointInitiateTimeoutInterval().get()));
        }
        tableSourceReaderBuilder.withPravegaConfig(connectorConfigurations.getPravegaConfig());
        if (connectorConfigurations.getMetrics().isPresent()) {
            tableSourceReaderBuilder.enableMetrics(connectorConfigurations.getMetrics().get());
        }
        tableSourceReaderBuilder.withPravegaConfig(connectorConfigurations.getPravegaConfig());
        for (StreamWithBoundaries streamWithBoundaries : connectorConfigurations.getReaderStreams()) {
            if (streamWithBoundaries.getFrom() != StreamCut.UNBOUNDED && streamWithBoundaries.getTo() != StreamCut.UNBOUNDED) {
                tableSourceReaderBuilder.forStream(streamWithBoundaries.getStream(), streamWithBoundaries.getFrom(), streamWithBoundaries.getTo());
                continue;
            }
            if (streamWithBoundaries.getFrom() != StreamCut.UNBOUNDED) {
                tableSourceReaderBuilder.forStream(streamWithBoundaries.getStream(), streamWithBoundaries.getFrom());
                continue;
            }
            tableSourceReaderBuilder.forStream(streamWithBoundaries.getStream());
        }
        FlinkPravegaTableSource flinkPravegaTableSource = new FlinkPravegaTableSource(tableSourceReaderBuilder::buildSourceFunction, tableSourceReaderBuilder::buildInputFormat, schema);
        flinkPravegaTableSource.setRowtimeAttributeDescriptors(SchemaValidator.deriveRowtimeAttributes((DescriptorProperties)descriptorProperties));
        Optional procTimeAttribute = SchemaValidator.deriveProctimeAttribute((DescriptorProperties)descriptorProperties);
        if (procTimeAttribute.isPresent()) {
            flinkPravegaTableSource.setProctimeAttribute((String)procTimeAttribute.get());
        }
        return flinkPravegaTableSource;
    }

    protected FlinkPravegaTableSink createFlinkPravegaTableSink(Map<String, String> properties) {
        DescriptorProperties descriptorProperties = this.getValidatedProperties(properties);
        TableSchema schema = TableSchemaUtils.getPhysicalSchema((TableSchema)descriptorProperties.getTableSchema("schema"));
        SerializationSchema<Row> serializationSchema = this.getSerializationSchema(properties);
        ConnectorConfigurations connectorConfigurations = new ConnectorConfigurations();
        connectorConfigurations.parseConfigurations(descriptorProperties, ConnectorConfigurations.ConfigurationType.WRITER);
        Pravega.TableSinkWriterBuilder tableSinkWriterBuilder = new Pravega().tableSinkWriterBuilder();
        if (connectorConfigurations.getTxnLeaseRenewalInterval().isPresent()) {
            tableSinkWriterBuilder.withTxnLeaseRenewalPeriod(Time.milliseconds((long)connectorConfigurations.getTxnLeaseRenewalInterval().get()));
        }
        if (connectorConfigurations.getWriterMode().isPresent()) {
            tableSinkWriterBuilder.withWriterMode(connectorConfigurations.getWriterMode().get());
        }
        if (connectorConfigurations.getMetrics().isPresent()) {
            tableSinkWriterBuilder.enableMetrics(connectorConfigurations.getMetrics().get());
        }
        if (connectorConfigurations.getWatermark().isPresent()) {
            tableSinkWriterBuilder.enableWatermark(connectorConfigurations.getWatermark().get());
        }
        tableSinkWriterBuilder.withPravegaConfig(connectorConfigurations.getPravegaConfig());
        tableSinkWriterBuilder.withRoutingKeyField(connectorConfigurations.getRoutingKey());
        tableSinkWriterBuilder.withSerializationSchema(serializationSchema);
        tableSinkWriterBuilder.forStream(connectorConfigurations.getWriterStream());
        tableSinkWriterBuilder.withPravegaConfig(connectorConfigurations.getPravegaConfig());
        return new FlinkPravegaTableSink(tableSinkWriterBuilder::createSinkFunction, tableSinkWriterBuilder::createOutputFormat, schema);
    }
}

