/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.connectors.flink.table.descriptors;

import io.pravega.client.ClientConfig;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import io.pravega.connectors.flink.AbstractStreamingReaderBuilder;
import io.pravega.connectors.flink.AbstractStreamingWriterBuilder;
import io.pravega.connectors.flink.FlinkPravegaInputFormat;
import io.pravega.connectors.flink.FlinkPravegaOutputFormat;
import io.pravega.connectors.flink.FlinkPravegaTableSink;
import io.pravega.connectors.flink.FlinkPravegaWriter;
import io.pravega.connectors.flink.PravegaConfig;
import io.pravega.connectors.flink.PravegaWriterMode;
import io.pravega.connectors.flink.util.StreamWithBoundaries;
import io.pravega.connectors.flink.watermark.AssignerWithTimeWindows;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

@Deprecated
public class Pravega
extends ConnectorDescriptor {
    public static final String CONNECTOR_TYPE_VALUE_PRAVEGA = "pravega";
    public static final int CONNECTOR_VERSION_VALUE = 1;
    public static final String CONNECTOR_METRICS = "connector.metrics";
    public static final String CONNECTOR_CONNECTION_CONFIG = "connector.connection-config";
    public static final String CONNECTOR_CONNECTION_CONFIG_CONTROLLER_URI = "connector.connection-config.controller-uri";
    public static final String CONNECTOR_CONNECTION_CONFIG_DEFAULT_SCOPE = "connector.connection-config.default-scope";
    public static final String CONNECTOR_CONNECTION_CONFIG_SECURITY = "connector.connection-config.security";
    public static final String CONNECTOR_CONNECTION_CONFIG_SECURITY_AUTH_TYPE = "connector.connection-config.security.auth-type";
    public static final String CONNECTOR_CONNECTION_CONFIG_SECURITY_AUTH_TOKEN = "connector.connection-config.security.auth-token";
    public static final String CONNECTOR_CONNECTION_CONFIG_SECURITY_VALIDATE_HOSTNAME = "connector.connection-config.security.validate-hostname";
    public static final String CONNECTOR_CONNECTION_CONFIG_SECURITY_TRUST_STORE = "connector.connection-config.security.trust-store";
    public static final String CONNECTOR_READER = "connector.reader";
    public static final String CONNECTOR_READER_STREAM_INFO = "connector.reader.stream-info";
    public static final String CONNECTOR_READER_STREAM_INFO_SCOPE = "scope";
    public static final String CONNECTOR_READER_STREAM_INFO_STREAM = "stream";
    public static final String CONNECTOR_READER_STREAM_INFO_START_STREAMCUT = "start-streamcut";
    public static final String CONNECTOR_READER_STREAM_INFO_END_STREAMCUT = "end-streamcut";
    public static final String CONNECTOR_READER_READER_GROUP = "connector.reader.reader-group";
    public static final String CONNECTOR_READER_READER_GROUP_UID = "connector.reader.reader-group.uid";
    public static final String CONNECTOR_READER_READER_GROUP_SCOPE = "connector.reader.reader-group.scope";
    public static final String CONNECTOR_READER_READER_GROUP_NAME = "connector.reader.reader-group.name";
    public static final String CONNECTOR_READER_READER_GROUP_REFRESH_INTERVAL = "connector.reader.reader-group.refresh-interval";
    public static final String CONNECTOR_READER_READER_GROUP_EVENT_READ_TIMEOUT_INTERVAL = "connector.reader.reader-group.event-read-timeout-interval";
    public static final String CONNECTOR_READER_READER_GROUP_CHECKPOINT_INITIATE_TIMEOUT_INTERVAL = "connector.reader.reader-group.checkpoint-initiate-timeout-interval";
    public static final String CONNECTOR_READER_USER_TIMESTAMP_ASSIGNER = "connector.reader.user.timestamp-assigner";
    public static final String CONNECTOR_WRITER = "connector.writer";
    public static final String CONNECTOR_WRITER_SCOPE = "connector.writer.scope";
    public static final String CONNECTOR_WRITER_STREAM = "connector.writer.stream";
    public static final String CONNECTOR_WRITER_MODE = "connector.writer.mode";
    public static final String CONNECTOR_WRITER_MODE_VALUE_EXACTLY_ONCE = "exactly_once";
    public static final String CONNECTOR_WRITER_MODE_VALUE_ATLEAST_ONCE = "atleast_once";
    public static final String CONNECTOR_WRITER_TXN_LEASE_RENEWAL_INTERVAL = "connector.writer.txn-lease-renewal-interval";
    public static final String CONNECTOR_WRITER_ENABLE_WATERMARK = "connector.writer.enable-watermark";
    public static final String CONNECTOR_WRITER_ROUTING_KEY_FILED_NAME = "connector.writer.routingkey-field-name";
    private TableSourceReaderBuilder tableSourceReaderBuilder = null;
    private TableSinkWriterBuilder tableSinkWriterBuilder = null;

    public Pravega() {
        super(CONNECTOR_TYPE_VALUE_PRAVEGA, 1, true);
    }

    protected Map<String, String> toConnectorProperties() {
        DescriptorProperties properties = new DescriptorProperties();
        properties.putString("connector.version", String.valueOf(1));
        if (this.tableSourceReaderBuilder == null && this.tableSinkWriterBuilder == null) {
            throw new ValidationException("Missing both reader and writer configurations.");
        }
        PravegaConfig pravegaConfig = this.tableSourceReaderBuilder != null ? this.tableSourceReaderBuilder.getPravegaConfig() : this.tableSinkWriterBuilder.getPravegaConfig();
        this.populateConnectionConfig(pravegaConfig, properties);
        boolean metrics = this.tableSourceReaderBuilder != null ? this.tableSourceReaderBuilder.isMetricsEnabled() : this.tableSinkWriterBuilder.isMetricsEnabled();
        properties.putBoolean(CONNECTOR_METRICS, metrics);
        if (this.tableSourceReaderBuilder != null) {
            this.populateReaderProperties(properties);
        }
        if (this.tableSinkWriterBuilder != null) {
            this.populateWriterProperties(properties);
        }
        return properties.asMap();
    }

    private void populateConnectionConfig(PravegaConfig pravegaConfig, DescriptorProperties properties) {
        String controllerUri = pravegaConfig.getClientConfig().getControllerURI().toString();
        properties.putString(CONNECTOR_CONNECTION_CONFIG_CONTROLLER_URI, controllerUri);
        String defaultScope = pravegaConfig.getDefaultScope();
        if (defaultScope != null && defaultScope.length() != 0) {
            properties.putString(CONNECTOR_CONNECTION_CONFIG_DEFAULT_SCOPE, defaultScope);
        }
        if (pravegaConfig.getClientConfig().getCredentials() != null) {
            String authToken;
            String authType = pravegaConfig.getClientConfig().getCredentials().getAuthenticationType();
            if (authType != null && authType.length() != 0) {
                properties.putString(CONNECTOR_CONNECTION_CONFIG_SECURITY_AUTH_TYPE, authType);
            }
            if ((authToken = pravegaConfig.getClientConfig().getCredentials().getAuthenticationToken()) != null && authToken.length() != 0) {
                properties.putString(CONNECTOR_CONNECTION_CONFIG_SECURITY_AUTH_TOKEN, authToken);
            }
        }
        boolean validateHostName = pravegaConfig.getClientConfig().isValidateHostName();
        properties.putBoolean(CONNECTOR_CONNECTION_CONFIG_SECURITY_VALIDATE_HOSTNAME, validateHostName);
        String trustStore = pravegaConfig.getClientConfig().getTrustStore();
        if (trustStore != null && trustStore.length() != 0) {
            properties.putString(CONNECTOR_CONNECTION_CONFIG_SECURITY_TRUST_STORE, trustStore);
        }
    }

    private void populateWriterProperties(DescriptorProperties properties) {
        properties.putBoolean(CONNECTOR_WRITER, true);
        properties.putString(CONNECTOR_WRITER_SCOPE, this.tableSinkWriterBuilder.resolveStream().getScope());
        properties.putString(CONNECTOR_WRITER_STREAM, this.tableSinkWriterBuilder.resolveStream().getStreamName());
        if (this.tableSinkWriterBuilder.writerMode == PravegaWriterMode.ATLEAST_ONCE) {
            properties.putString(CONNECTOR_WRITER_MODE, CONNECTOR_WRITER_MODE_VALUE_ATLEAST_ONCE);
        } else if (this.tableSinkWriterBuilder.writerMode == PravegaWriterMode.EXACTLY_ONCE) {
            properties.putString(CONNECTOR_WRITER_MODE, CONNECTOR_WRITER_MODE_VALUE_EXACTLY_ONCE);
        }
        properties.putBoolean(CONNECTOR_WRITER_ENABLE_WATERMARK, this.tableSinkWriterBuilder.enableWatermark);
        properties.putLong(CONNECTOR_WRITER_TXN_LEASE_RENEWAL_INTERVAL, this.tableSinkWriterBuilder.txnLeaseRenewalPeriod.toMilliseconds());
        if (this.tableSinkWriterBuilder.routingKeyFieldName != null) {
            properties.putString(CONNECTOR_WRITER_ROUTING_KEY_FILED_NAME, this.tableSinkWriterBuilder.routingKeyFieldName);
        }
    }

    private void populateReaderProperties(DescriptorProperties properties) {
        properties.putBoolean(CONNECTOR_READER, true);
        AbstractStreamingReaderBuilder.ReaderGroupInfo readerGroupInfo = this.tableSourceReaderBuilder.buildReaderGroupInfo();
        Map<Stream, StreamCut> startStreamCuts = readerGroupInfo.getReaderGroupConfig().getStartingStreamCuts();
        Map<Stream, StreamCut> endStreamCuts = readerGroupInfo.getReaderGroupConfig().getEndingStreamCuts();
        ArrayList values = new ArrayList();
        startStreamCuts.keySet().stream().forEach(stream -> {
            StreamCut startStreamCut = (StreamCut)startStreamCuts.get(stream);
            StreamCut endStreamCut = (StreamCut)endStreamCuts.get(stream);
            values.add(Arrays.asList(stream.getScope(), stream.getStreamName(), startStreamCut.asText(), endStreamCut.asText()));
        });
        properties.putIndexedFixedProperties(CONNECTOR_READER_STREAM_INFO, Arrays.asList(CONNECTOR_READER_STREAM_INFO_SCOPE, CONNECTOR_READER_STREAM_INFO_STREAM, CONNECTOR_READER_STREAM_INFO_START_STREAMCUT, CONNECTOR_READER_STREAM_INFO_END_STREAMCUT), values);
        String uid = Optional.ofNullable(this.tableSourceReaderBuilder.uid).orElseGet(this.tableSourceReaderBuilder::generateUid);
        properties.putString(CONNECTOR_READER_READER_GROUP_UID, uid);
        properties.putString(CONNECTOR_READER_READER_GROUP_SCOPE, readerGroupInfo.getReaderGroupScope());
        properties.putString(CONNECTOR_READER_READER_GROUP_NAME, readerGroupInfo.getReaderGroupName());
        properties.putLong(CONNECTOR_READER_READER_GROUP_REFRESH_INTERVAL, readerGroupInfo.getReaderGroupConfig().getGroupRefreshTimeMillis());
        properties.putLong(CONNECTOR_READER_READER_GROUP_EVENT_READ_TIMEOUT_INTERVAL, this.tableSourceReaderBuilder.eventReadTimeout.toMilliseconds());
        properties.putLong(CONNECTOR_READER_READER_GROUP_CHECKPOINT_INITIATE_TIMEOUT_INTERVAL, this.tableSourceReaderBuilder.checkpointInitiateTimeout.toMilliseconds());
        if (this.tableSourceReaderBuilder.getAssignerWithTimeWindows() != null) {
            try {
                AssignerWithTimeWindows assigner = (AssignerWithTimeWindows)this.tableSourceReaderBuilder.getAssignerWithTimeWindows().deserializeValue(((Object)((Object)this)).getClass().getClassLoader());
                properties.putClass(CONNECTOR_READER_USER_TIMESTAMP_ASSIGNER, assigner.getClass());
            }
            catch (Exception e) {
                throw new TableException(e.getMessage());
            }
        }
    }

    public TableSourceReaderBuilder tableSourceReaderBuilder() {
        this.tableSourceReaderBuilder = new TableSourceReaderBuilder();
        return this.tableSourceReaderBuilder;
    }

    public TableSinkWriterBuilder tableSinkWriterBuilder() {
        this.tableSinkWriterBuilder = new TableSinkWriterBuilder();
        return this.tableSinkWriterBuilder;
    }

    public static class TableSinkWriterBuilder<T extends AbstractStreamingWriterBuilder>
    extends AbstractStreamingWriterBuilder<Row, TableSinkWriterBuilder> {
        private String routingKeyFieldName;
        private SerializationSchema<Row> serializationSchema;

        public TableSinkWriterBuilder withRoutingKeyField(String fieldName) {
            this.routingKeyFieldName = fieldName;
            return this.builder();
        }

        public TableSinkWriterBuilder withSerializationSchema(SerializationSchema<Row> serializationSchema) {
            this.serializationSchema = serializationSchema;
            return this.builder();
        }

        @Override
        protected TableSinkWriterBuilder builder() {
            return this;
        }

        public FlinkPravegaWriter<Row> createSinkFunction(TableSchema tableSchema) {
            Preconditions.checkState((this.serializationSchema != null ? 1 : 0) != 0, (Object)"The serializationSchema must be provided.");
            FlinkPravegaTableSink.RowBasedRouter eventRouter = null;
            if (this.routingKeyFieldName != null) {
                eventRouter = new FlinkPravegaTableSink.RowBasedRouter(this.routingKeyFieldName, tableSchema.getFieldNames(), tableSchema.getFieldDataTypes());
            }
            return this.createSinkFunction(this.serializationSchema, eventRouter);
        }

        public FlinkPravegaOutputFormat<Row> createOutputFormat(TableSchema tableSchema) {
            Preconditions.checkState((this.serializationSchema != null ? 1 : 0) != 0, (Object)"The serializationSchema must be provided.");
            FlinkPravegaTableSink.RowBasedRouter eventRouter = null;
            if (this.routingKeyFieldName != null) {
                eventRouter = new FlinkPravegaTableSink.RowBasedRouter(this.routingKeyFieldName, tableSchema.getFieldNames(), tableSchema.getFieldDataTypes());
            }
            return new FlinkPravegaOutputFormat<Row>(this.getPravegaConfig().getClientConfig(), this.resolveStream(), this.serializationSchema, eventRouter);
        }
    }

    public static class TableSourceReaderBuilder<T extends AbstractStreamingReaderBuilder>
    extends AbstractStreamingReaderBuilder<Row, TableSourceReaderBuilder> {
        private DeserializationSchema<Row> deserializationSchema;
        private SerializedValue<AssignerWithTimeWindows<Row>> assignerWithTimeWindows;

        @Override
        protected DeserializationSchema<Row> getDeserializationSchema() {
            return this.deserializationSchema;
        }

        @Override
        protected SerializedValue<AssignerWithTimeWindows<Row>> getAssignerWithTimeWindows() {
            return this.assignerWithTimeWindows;
        }

        @Override
        protected TableSourceReaderBuilder builder() {
            return this;
        }

        public TableSourceReaderBuilder withDeserializationSchema(DeserializationSchema<Row> deserializationSchema) {
            this.deserializationSchema = deserializationSchema;
            return this;
        }

        public TableSourceReaderBuilder withTimestampAssigner(AssignerWithTimeWindows<Row> assignerWithTimeWindows) {
            try {
                ClosureCleaner.clean(assignerWithTimeWindows, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
                this.assignerWithTimeWindows = new SerializedValue(assignerWithTimeWindows);
            }
            catch (IOException e) {
                throw new IllegalArgumentException("The given assigner is not serializable", e);
            }
            return this;
        }

        public FlinkPravegaInputFormat<Row> buildInputFormat() {
            Preconditions.checkState((this.deserializationSchema != null ? 1 : 0) != 0, (Object)"The deserializationSchema must be provided.");
            List<StreamWithBoundaries> streams = this.resolveStreams();
            ClientConfig clientConfig = this.getPravegaConfig().getClientConfig();
            return new FlinkPravegaInputFormat<Row>(clientConfig, streams, this.deserializationSchema);
        }
    }
}

