/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.pulsar.source;

import com.google.auto.service.AutoService;
import java.util.Arrays;
import java.util.regex.Pattern;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.PropertiesUtil;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumeratorState;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.SubscriptionStartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.PulsarDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicListDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicPatternDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.format.PulsarCanalDecorator;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

@AutoService(value={SeaTunnelSource.class})
public class PulsarSource
implements SeaTunnelSource<SeaTunnelRow, PulsarPartitionSplit, PulsarSplitEnumeratorState>,
SupportParallelism {
    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private SeaTunnelRowType typeInfo;
    private PulsarAdminConfig adminConfig;
    private PulsarClientConfig clientConfig;
    private PulsarConsumerConfig consumerConfig;
    private PulsarDiscoverer partitionDiscoverer;
    private long partitionDiscoveryIntervalMs;
    private StartCursor startCursor;
    private StopCursor stopCursor;
    protected int pollTimeout;
    protected long pollInterval;
    protected int batchSize;

    public String getPluginName() {
        return "pulsar";
    }

    public void prepare(Config config) throws PrepareFailException {
        CheckResult result = CheckConfigUtil.checkAllExists((Config)config, (String[])new String[]{SourceProperties.SUBSCRIPTION_NAME.key(), SourceProperties.CLIENT_SERVICE_URL.key(), SourceProperties.ADMIN_SERVICE_URL.key()});
        if (!result.isSuccess()) {
            throw new PulsarConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", this.getPluginName(), PluginType.SOURCE, result.getMsg()));
        }
        PulsarAdminConfig.Builder adminConfigBuilder = PulsarAdminConfig.builder().adminUrl(config.getString(SourceProperties.ADMIN_SERVICE_URL.key()));
        PropertiesUtil.setOption((Config)config, (String)SourceProperties.AUTH_PLUGIN_CLASS.key(), arg_0 -> ((Config)config).getString(arg_0), adminConfigBuilder::authPluginClassName);
        PropertiesUtil.setOption((Config)config, (String)SourceProperties.AUTH_PARAMS.key(), arg_0 -> ((Config)config).getString(arg_0), adminConfigBuilder::authParams);
        this.adminConfig = adminConfigBuilder.build();
        PulsarClientConfig.Builder clientConfigBuilder = PulsarClientConfig.builder().serviceUrl(config.getString(SourceProperties.CLIENT_SERVICE_URL.key()));
        PropertiesUtil.setOption((Config)config, (String)SourceProperties.AUTH_PLUGIN_CLASS.key(), arg_0 -> ((Config)config).getString(arg_0), clientConfigBuilder::authPluginClassName);
        PropertiesUtil.setOption((Config)config, (String)SourceProperties.AUTH_PARAMS.key(), arg_0 -> ((Config)config).getString(arg_0), clientConfigBuilder::authParams);
        this.clientConfig = clientConfigBuilder.build();
        PulsarConsumerConfig.Builder consumerConfigBuilder = PulsarConsumerConfig.builder().subscriptionName(config.getString(SourceProperties.SUBSCRIPTION_NAME.key()));
        this.consumerConfig = consumerConfigBuilder.build();
        PropertiesUtil.setOption((Config)config, (String)SourceProperties.TOPIC_DISCOVERY_INTERVAL.key(), (Object)SourceProperties.TOPIC_DISCOVERY_INTERVAL.defaultValue(), arg_0 -> ((Config)config).getLong(arg_0), v -> {
            this.partitionDiscoveryIntervalMs = v;
        });
        PropertiesUtil.setOption((Config)config, (String)SourceProperties.POLL_TIMEOUT.key(), (Object)SourceProperties.POLL_TIMEOUT.defaultValue(), arg_0 -> ((Config)config).getInt(arg_0), v -> {
            this.pollTimeout = v;
        });
        PropertiesUtil.setOption((Config)config, (String)SourceProperties.POLL_INTERVAL.key(), (Object)SourceProperties.POLL_INTERVAL.defaultValue(), arg_0 -> ((Config)config).getLong(arg_0), v -> {
            this.pollInterval = v;
        });
        PropertiesUtil.setOption((Config)config, (String)SourceProperties.POLL_BATCH_SIZE.key(), (Object)SourceProperties.POLL_BATCH_SIZE.defaultValue(), arg_0 -> ((Config)config).getInt(arg_0), v -> {
            this.batchSize = v;
        });
        this.setStartCursor(config);
        this.setStopCursor(config);
        this.setPartitionDiscoverer(config);
        this.setDeserialization(config);
        if (this.partitionDiscoverer instanceof TopicPatternDiscoverer && this.partitionDiscoveryIntervalMs > 0L && Boundedness.BOUNDED == this.stopCursor.getBoundedness()) {
            throw new PulsarConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, "Bounded streams do not support dynamic partition discovery.");
        }
    }

    private void setStartCursor(Config config) {
        SourceProperties.StartMode startMode = (SourceProperties.StartMode)PropertiesUtil.getEnum((Config)config, (String)SourceProperties.CURSOR_STARTUP_MODE.key(), SourceProperties.StartMode.class, (Enum)((Enum)SourceProperties.CURSOR_STARTUP_MODE.defaultValue()));
        switch (startMode) {
            case EARLIEST: {
                this.startCursor = StartCursor.earliest();
                break;
            }
            case LATEST: {
                this.startCursor = StartCursor.latest();
                break;
            }
            case SUBSCRIPTION: {
                SubscriptionStartCursor.CursorResetStrategy resetStrategy = (SubscriptionStartCursor.CursorResetStrategy)PropertiesUtil.getEnum((Config)config, (String)SourceProperties.CURSOR_RESET_MODE.key(), SubscriptionStartCursor.CursorResetStrategy.class, (Enum)SubscriptionStartCursor.CursorResetStrategy.LATEST);
                this.startCursor = StartCursor.subscription(resetStrategy);
                break;
            }
            case TIMESTAMP: {
                if (StringUtils.isBlank(config.getString(SourceProperties.CURSOR_STARTUP_TIMESTAMP.key()))) {
                    throw new PulsarConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The '%s' property is required when the '%s' is 'timestamp'.", SourceProperties.CURSOR_STARTUP_TIMESTAMP.key(), SourceProperties.CURSOR_STARTUP_MODE.key()));
                }
                PropertiesUtil.setOption((Config)config, (String)SourceProperties.CURSOR_STARTUP_TIMESTAMP.key(), arg_0 -> ((Config)config).getLong(arg_0), timestamp -> {
                    this.startCursor = StartCursor.timestamp(timestamp);
                });
                break;
            }
            default: {
                throw new PulsarConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The %s mode is not supported.", new Object[]{startMode}));
            }
        }
    }

    private void setStopCursor(Config config) {
        SourceProperties.StopMode stopMode = (SourceProperties.StopMode)PropertiesUtil.getEnum((Config)config, (String)SourceProperties.CURSOR_STOP_MODE.key(), SourceProperties.StopMode.class, (Enum)((Enum)SourceProperties.CURSOR_STOP_MODE.defaultValue()));
        switch (stopMode) {
            case LATEST: {
                this.stopCursor = StopCursor.latest();
                break;
            }
            case NEVER: {
                this.stopCursor = StopCursor.never();
                break;
            }
            case TIMESTAMP: {
                if (StringUtils.isBlank(config.getString(SourceProperties.CURSOR_STOP_TIMESTAMP.key()))) {
                    throw new PulsarConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The '%s' property is required when the '%s' is 'timestamp'.", SourceProperties.CURSOR_STOP_TIMESTAMP.key(), SourceProperties.CURSOR_STOP_MODE.key()));
                }
                PropertiesUtil.setOption((Config)config, (String)SourceProperties.CURSOR_STARTUP_TIMESTAMP.key(), arg_0 -> ((Config)config).getLong(arg_0), timestamp -> {
                    this.stopCursor = StopCursor.timestamp(timestamp);
                });
                break;
            }
            default: {
                throw new PulsarConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("The %s mode is not supported.", new Object[]{stopMode}));
            }
        }
    }

    private void setPartitionDiscoverer(Config config) {
        String topicPattern;
        String topic;
        if (config.hasPath(SourceProperties.TOPIC.key()) && StringUtils.isNotBlank(topic = config.getString(SourceProperties.TOPIC.key()))) {
            this.partitionDiscoverer = new TopicListDiscoverer(Arrays.asList(StringUtils.split(topic, ",")));
        }
        if (config.hasPath(SourceProperties.TOPIC_PATTERN.key()) && StringUtils.isNotBlank(topicPattern = config.getString(SourceProperties.TOPIC_PATTERN.key()))) {
            this.partitionDiscoverer = new TopicPatternDiscoverer(Pattern.compile(topicPattern));
        }
        if (this.partitionDiscoverer == null) {
            throw new PulsarConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The properties '%s' or '%s' is required.", SourceProperties.TOPIC.key(), SourceProperties.TOPIC_PATTERN.key()));
        }
    }

    private void setDeserialization(Config config) {
        if (config.hasPath(SourceProperties.SCHEMA.key())) {
            this.typeInfo = CatalogTableUtil.buildWithConfig((Config)config).getSeaTunnelRowType();
            String format = (String)SourceProperties.FORMAT.defaultValue();
            if (config.hasPath(SourceProperties.FORMAT.key())) {
                format = config.getString(SourceProperties.FORMAT.key());
            }
            switch (format) {
                case "json": {
                    this.deserializationSchema = new JsonDeserializationSchema(false, false, this.typeInfo);
                    break;
                }
                case "canal_json": {
                    this.deserializationSchema = new PulsarCanalDecorator(CanalJsonDeserializationSchema.builder(this.typeInfo).setIgnoreParseErrors(true).build());
                    break;
                }
                default: {
                    throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
                }
            }
        } else {
            this.typeInfo = CatalogTableUtil.buildSimpleTextSchema();
            this.deserializationSchema = new JsonDeserializationSchema(false, false, this.typeInfo);
        }
    }

    public Boundedness getBoundedness() {
        return this.stopCursor instanceof NeverStopCursor ? Boundedness.UNBOUNDED : Boundedness.BOUNDED;
    }

    public SeaTunnelRowType getProducedType() {
        return this.typeInfo;
    }

    public SourceReader<SeaTunnelRow, PulsarPartitionSplit> createReader(SourceReader.Context readerContext) throws Exception {
        return new PulsarSourceReader<SeaTunnelRow>(readerContext, this.clientConfig, this.consumerConfig, this.startCursor, this.deserializationSchema, this.pollTimeout, this.pollInterval, this.batchSize);
    }

    public SourceSplitEnumerator<PulsarPartitionSplit, PulsarSplitEnumeratorState> createEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> enumeratorContext) throws Exception {
        return new PulsarSplitEnumerator(enumeratorContext, this.adminConfig, this.partitionDiscoverer, this.partitionDiscoveryIntervalMs, this.startCursor, this.stopCursor, this.consumerConfig.getSubscriptionName());
    }

    public SourceSplitEnumerator<PulsarPartitionSplit, PulsarSplitEnumeratorState> restoreEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> enumeratorContext, PulsarSplitEnumeratorState checkpointState) throws Exception {
        return new PulsarSplitEnumerator(enumeratorContext, this.adminConfig, this.partitionDiscoverer, this.partitionDiscoveryIntervalMs, this.startCursor, this.stopCursor, this.consumerConfig.getSubscriptionName(), checkpointState.getAssignedPartitions());
    }
}

