/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.pulsar;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.internal.DefaultImplementation;

public class PulsarActionUtils {
    static final ConfigOption<String> VALUE_FORMAT = ConfigOptions.key((String)"value.format").stringType().noDefaultValue().withDescription("Defines the format identifier for encoding value data.");
    static final ConfigOption<String> TOPIC = ConfigOptions.key((String)"topic").stringType().noDefaultValue().withDescription("Topic names from which the table is read. Either 'topic' or 'topic-pattern' must be set for source. Option 'topic' is required for sink.");
    static final ConfigOption<String> PULSAR_AUTH_PARAM_MAP = ConfigOptions.key((String)"pulsar.client.authParamMap").stringType().noDefaultValue().withDescription("Parameters for the authentication plugin.");
    static final ConfigOption<String> PULSAR_START_CURSOR_FROM_MESSAGE_ID = ConfigOptions.key((String)"pulsar.startCursor.fromMessageId").stringType().defaultValue((Object)"EARLIEST").withDescription("Using a unique identifier of a single message to seek the start position. The common format is a triple '<long>ledgerId,<long>entryId,<int>partitionIndex'. Specially, you can set it to EARLIEST (-1, -1, -1) or LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1).");
    static final ConfigOption<Long> PULSAR_START_CURSOR_FORM_PUBLISH_TIME = ConfigOptions.key((String)"pulsar.startCursor.fromPublishTime").longType().noDefaultValue().withDescription("Using the message publish time to seek the start position.");
    static final ConfigOption<Boolean> PULSAR_START_CURSOR_FROM_MESSAGE_ID_INCLUSIVE = ConfigOptions.key((String)"pulsar.startCursor.fromMessageIdInclusive").booleanType().defaultValue((Object)true).withDescription("Whether to include the given message id. This option only works when the message id is not EARLIEST or LATEST.");
    static final ConfigOption<String> PULSAR_STOP_CURSOR_AT_MESSAGE_ID = ConfigOptions.key((String)"pulsar.stopCursor.atMessageId").stringType().noDefaultValue().withDescription("Stop consuming when the message id is equal or greater than the specified message id. Message that is equal to the specified message id will not be consumed. The common format is a triple '<long>ledgerId,<long>entryId,<int>partitionIndex'. Specially, you can set it to LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1).");
    static final ConfigOption<String> PULSAR_STOP_CURSOR_AFTER_MESSAGE_ID = ConfigOptions.key((String)"pulsar.stopCursor.afterMessageId").stringType().noDefaultValue().withDescription("Stop consuming when the message id is greater than the specified message id. Message that is equal to the specified message id will be consumed. The common format is a triple '<long>ledgerId,<long>entryId,<int>partitionIndex'. Specially, you can set it to LATEST (Long.MAX_VALUE, Long.MAX_VALUE, -1).");
    static final ConfigOption<Long> PULSAR_STOP_CURSOR_AT_EVENT_TIME = ConfigOptions.key((String)"pulsar.stopCursor.atEventTime").longType().noDefaultValue().withDescription("Stop consuming when message event time is greater than or equals the specified timestamp. Message that even time is equal to the specified timestamp will not be consumed.");
    static final ConfigOption<Long> PULSAR_STOP_CURSOR_AFTER_EVENT_TIME = ConfigOptions.key((String)"pulsar.stopCursor.afterEventTime").longType().noDefaultValue().withDescription("Stop consuming when message event time is greater than the specified timestamp. Message that even time is equal to the specified timestamp will be consumed.");
    static final ConfigOption<Boolean> PULSAR_SOURCE_UNBOUNDED = ConfigOptions.key((String)"pulsar.source.unbounded").booleanType().defaultValue((Object)true).withDescription("To specify the boundedness of a stream.");

    static PulsarSource<String> buildPulsarSource(Configuration rawConfig) {
        Configuration pulsarConfig = PulsarActionUtils.preprocessPulsarConfig(rawConfig);
        PulsarActionUtils.validatePulsarConfig(pulsarConfig);
        PulsarSourceBuilder pulsarSourceBuilder = PulsarSource.builder();
        pulsarSourceBuilder.setServiceUrl((String)pulsarConfig.get(PulsarOptions.PULSAR_SERVICE_URL)).setAdminUrl((String)pulsarConfig.get(PulsarOptions.PULSAR_ADMIN_URL)).setSubscriptionName((String)pulsarConfig.get(PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME)).setTopics(Arrays.stream(((String)pulsarConfig.get(TOPIC)).split(",")).map(String::trim).collect(Collectors.toList())).setDeserializationSchema((DeserializationSchema)new SimpleStringSchema());
        pulsarConfig.getOptional(PulsarSourceOptions.PULSAR_CONSUMER_NAME).ifPresent(arg_0 -> ((PulsarSourceBuilder)pulsarSourceBuilder).setConsumerName(arg_0));
        if (pulsarConfig.contains(PULSAR_START_CURSOR_FORM_PUBLISH_TIME)) {
            Preconditions.checkArgument(!pulsarConfig.contains(PULSAR_START_CURSOR_FROM_MESSAGE_ID), "");
            pulsarSourceBuilder.setStartCursor(StartCursor.fromPublishTime((long)((Long)pulsarConfig.get(PULSAR_START_CURSOR_FORM_PUBLISH_TIME))));
        } else {
            String messageId = (String)pulsarConfig.get(PULSAR_START_CURSOR_FROM_MESSAGE_ID);
            if (messageId.equalsIgnoreCase("EARLIEST")) {
                pulsarSourceBuilder.setStartCursor(StartCursor.earliest());
            } else if (messageId.equalsIgnoreCase("LATEST")) {
                pulsarSourceBuilder.setStartCursor(StartCursor.latest());
            } else {
                StartCursor startCursor = StartCursor.fromMessageId((MessageId)PulsarActionUtils.toMessageId(messageId), (boolean)((Boolean)pulsarConfig.get(PULSAR_START_CURSOR_FROM_MESSAGE_ID_INCLUSIVE)));
                pulsarSourceBuilder.setStartCursor(startCursor);
            }
        }
        StopCursor stopCursor = StopCursor.never();
        int stopCursorSet = 0;
        if (pulsarConfig.contains(PULSAR_STOP_CURSOR_AT_MESSAGE_ID)) {
            stopCursor = StopCursor.atMessageId((MessageId)PulsarActionUtils.toMessageId((String)pulsarConfig.get(PULSAR_STOP_CURSOR_AT_MESSAGE_ID)));
            ++stopCursorSet;
        }
        if (pulsarConfig.contains(PULSAR_STOP_CURSOR_AFTER_MESSAGE_ID)) {
            stopCursor = StopCursor.afterMessageId((MessageId)PulsarActionUtils.toMessageId((String)pulsarConfig.get(PULSAR_STOP_CURSOR_AT_MESSAGE_ID)));
            ++stopCursorSet;
        }
        if (pulsarConfig.contains(PULSAR_STOP_CURSOR_AT_EVENT_TIME)) {
            stopCursor = StopCursor.atEventTime((long)((Long)pulsarConfig.get(PULSAR_STOP_CURSOR_AT_EVENT_TIME)));
            ++stopCursorSet;
        }
        if (pulsarConfig.contains(PULSAR_STOP_CURSOR_AFTER_EVENT_TIME)) {
            stopCursor = StopCursor.atEventTime((long)((Long)pulsarConfig.get(PULSAR_STOP_CURSOR_AFTER_EVENT_TIME)));
            ++stopCursorSet;
        }
        Preconditions.checkArgument(stopCursorSet <= 1, "You can set at most one of the stop cursor options.");
        if (((Boolean)pulsarConfig.get(PULSAR_SOURCE_UNBOUNDED)).booleanValue()) {
            pulsarSourceBuilder.setUnboundedStopCursor(stopCursor);
        } else {
            pulsarSourceBuilder.setBoundedStopCursor(stopCursor);
        }
        String authPluginClassName = (String)pulsarConfig.get(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME);
        if (authPluginClassName != null) {
            String authParamsString = (String)pulsarConfig.get(PulsarOptions.PULSAR_AUTH_PARAMS);
            Map authParamsMap = (Map)pulsarConfig.get(PulsarOptions.PULSAR_AUTH_PARAM_MAP);
            Preconditions.checkArgument(authParamsString != null || authParamsMap != null, "You should set '%s' or '%s'", PulsarOptions.PULSAR_AUTH_PARAMS.key(), PULSAR_AUTH_PARAM_MAP.key());
            Preconditions.checkArgument(authParamsString == null || authParamsMap == null, "You can only set one of '%s' and '%s'", PulsarOptions.PULSAR_AUTH_PARAMS.key(), PULSAR_AUTH_PARAM_MAP.key());
            if (authParamsString != null) {
                pulsarSourceBuilder.setAuthentication(authPluginClassName, authParamsString);
            } else {
                pulsarSourceBuilder.setAuthentication(authPluginClassName, authParamsMap);
            }
        }
        pulsarSourceBuilder.setConfig(pulsarConfig);
        return pulsarSourceBuilder.build();
    }

    private static void validatePulsarConfig(Configuration pulsarConfig) {
        Preconditions.checkArgument(pulsarConfig.contains(VALUE_FORMAT), String.format("pulsar-conf [%s] must be specified.", VALUE_FORMAT.key()));
        Preconditions.checkArgument(pulsarConfig.contains(PulsarOptions.PULSAR_SERVICE_URL), String.format("pulsar-conf [%s] must be specified.", PulsarOptions.PULSAR_SERVICE_URL.key()));
        Preconditions.checkArgument(pulsarConfig.contains(PulsarOptions.PULSAR_ADMIN_URL), String.format("pulsar-conf [%s] must be specified.", PulsarOptions.PULSAR_ADMIN_URL.key()));
        Preconditions.checkArgument(pulsarConfig.contains(PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME), String.format("pulsar-conf [%s] must be specified.", PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME.key()));
        Preconditions.checkArgument(pulsarConfig.contains(TOPIC), String.format("pulsar-conf [%s] must be specified.", TOPIC.key()));
    }

    private static MessageId toMessageId(String messageIdString) {
        if (messageIdString.equalsIgnoreCase("EARLIEST")) {
            return MessageId.earliest;
        }
        if (messageIdString.equalsIgnoreCase("LATEST")) {
            return MessageId.latest;
        }
        String[] splits = messageIdString.split(",");
        Preconditions.checkArgument(splits.length == 3, "Please use format '<long>ledgerId,<long>entryId,<int>partitionIndex' for message id");
        return DefaultImplementation.getDefaultImplementation().newMessageId(Long.parseLong(splits[0].trim()), Long.parseLong(splits[1].trim()), Integer.parseInt(splits[2].trim()));
    }

    static SourceConfiguration toSourceConfiguration(Configuration rawConfig) {
        return new SourceConfiguration(PulsarActionUtils.preprocessPulsarConfig(rawConfig));
    }

    private static Configuration preprocessPulsarConfig(Configuration rawConfig) {
        Configuration cloned = new Configuration(rawConfig);
        if (cloned.contains(PULSAR_AUTH_PARAM_MAP)) {
            Map<String, String> authParamsMap = ParameterUtils.parseCommaSeparatedKeyValues((String)cloned.get(PULSAR_AUTH_PARAM_MAP));
            cloned.removeConfig(PULSAR_AUTH_PARAM_MAP);
            cloned.set(PulsarOptions.PULSAR_AUTH_PARAM_MAP, authParamsMap);
        }
        return cloned;
    }

    static DataFormat getDataFormat(Configuration pulsarConfig) {
        return DataFormat.fromConfigString((String)pulsarConfig.get(VALUE_FORMAT));
    }

    static MessageQueueSchemaUtils.ConsumerWrapper createPulsarConsumer(Configuration pulsarConfig, String topic) throws PulsarClientException {
        SourceConfiguration pulsarSourceConfiguration = PulsarActionUtils.toSourceConfiguration(pulsarConfig);
        PulsarClient pulsarClient = PulsarClientFactory.createClient((PulsarConfiguration)pulsarSourceConfiguration);
        ConsumerBuilder consumerBuilder = PulsarSourceConfigUtils.createConsumerBuilder((PulsarClient)pulsarClient, (Schema)Schema.STRING, (SourceConfiguration)pulsarSourceConfiguration);
        consumerBuilder.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
        TopicPartition topicPartition = new TopicPartition(topic);
        consumerBuilder.topic(new String[]{topicPartition.getFullTopicName()});
        if (!TopicRangeUtils.isFullTopicRanges((List)topicPartition.getRanges())) {
            KeySharedPolicy.KeySharedPolicySticky policy = KeySharedPolicy.stickyHashRange().ranges(topicPartition.getPulsarRanges());
            policy.setAllowOutOfOrderDelivery(pulsarSourceConfiguration.isAllowKeySharedOutOfOrderDelivery());
            consumerBuilder.keySharedPolicy((KeySharedPolicy)policy);
        }
        Consumer consumer = consumerBuilder.subscribe();
        return new PulsarConsumerWrapper((Consumer<String>)consumer);
    }

    private static class PulsarConsumerWrapper
    implements MessageQueueSchemaUtils.ConsumerWrapper {
        private final Consumer<String> consumer;

        PulsarConsumerWrapper(Consumer<String> consumer) {
            this.consumer = consumer;
        }

        @Override
        public List<String> getRecords(String topic, int pollTimeOutMills) {
            try {
                Message message = this.consumer.receive(pollTimeOutMills, TimeUnit.MILLISECONDS);
                return message == null ? Collections.emptyList() : Collections.singletonList(message.getValue());
            }
            catch (PulsarClientException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void close() throws PulsarClientException {
            this.consumer.close();
        }
    }
}

