/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.kafka;

import io.jsonwebtoken.io.Encoders;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.KafkaSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaAbstractSource<V>
extends PushSource<V> {
    public static final String HEADER_KAFKA_TOPIC_KEY = "__kafka_topic";
    public static final String HEADER_KAFKA_PTN_KEY = "__kafka_partition";
    public static final String HEADER_KAFKA_OFFSET_KEY = "__kafka_offset";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaAbstractSource.class);
    private volatile Consumer<Object, Object> consumer;
    private volatile boolean running = false;
    private KafkaSourceConfig kafkaSourceConfig;
    private Thread runnerThread;
    private long maxPollIntervalMs;

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext);
        Objects.requireNonNull(this.kafkaSourceConfig.getTopic(), "Kafka topic is not set");
        Objects.requireNonNull(this.kafkaSourceConfig.getBootstrapServers(), "Kafka bootstrapServers is not set");
        Objects.requireNonNull(this.kafkaSourceConfig.getGroupId(), "Kafka consumer group id is not set");
        if (this.kafkaSourceConfig.getFetchMinBytes() <= 0L) {
            throw new IllegalArgumentException("Invalid Kafka Consumer fetchMinBytes : " + this.kafkaSourceConfig.getFetchMinBytes());
        }
        if (this.kafkaSourceConfig.isAutoCommitEnabled() && this.kafkaSourceConfig.getAutoCommitIntervalMs() <= 0L) {
            throw new IllegalArgumentException("Invalid Kafka Consumer autoCommitIntervalMs : " + this.kafkaSourceConfig.getAutoCommitIntervalMs());
        }
        if (this.kafkaSourceConfig.getSessionTimeoutMs() <= 0L) {
            throw new IllegalArgumentException("Invalid Kafka Consumer sessionTimeoutMs : " + this.kafkaSourceConfig.getSessionTimeoutMs());
        }
        if (this.kafkaSourceConfig.getHeartbeatIntervalMs() <= 0L) {
            throw new IllegalArgumentException("Invalid Kafka Consumer heartbeatIntervalMs : " + this.kafkaSourceConfig.getHeartbeatIntervalMs());
        }
        Properties props = new Properties();
        if (this.kafkaSourceConfig.getConsumerConfigProperties() != null) {
            props.putAll(this.kafkaSourceConfig.getConsumerConfigProperties());
        }
        props.put("bootstrap.servers", this.kafkaSourceConfig.getBootstrapServers());
        if (StringUtils.isNotEmpty((CharSequence)this.kafkaSourceConfig.getSecurityProtocol())) {
            props.put("security.protocol", this.kafkaSourceConfig.getSecurityProtocol());
        }
        if (StringUtils.isNotEmpty((CharSequence)this.kafkaSourceConfig.getSaslMechanism())) {
            props.put("sasl.mechanism", this.kafkaSourceConfig.getSaslMechanism());
        }
        if (StringUtils.isNotEmpty((CharSequence)this.kafkaSourceConfig.getSaslJaasConfig())) {
            props.put("sasl.jaas.config", this.kafkaSourceConfig.getSaslJaasConfig());
        }
        if (StringUtils.isNotEmpty((CharSequence)this.kafkaSourceConfig.getSslEnabledProtocols())) {
            props.put("ssl.enabled.protocols", this.kafkaSourceConfig.getSslEnabledProtocols());
        }
        if (StringUtils.isNotEmpty((CharSequence)this.kafkaSourceConfig.getSslEndpointIdentificationAlgorithm())) {
            props.put("ssl.endpoint.identification.algorithm", this.kafkaSourceConfig.getSslEndpointIdentificationAlgorithm());
        }
        if (StringUtils.isNotEmpty((CharSequence)this.kafkaSourceConfig.getSslTruststoreLocation())) {
            props.put("ssl.truststore.location", this.kafkaSourceConfig.getSslTruststoreLocation());
        }
        if (StringUtils.isNotEmpty((CharSequence)this.kafkaSourceConfig.getSslTruststorePassword())) {
            props.put("ssl.truststore.password", this.kafkaSourceConfig.getSslTruststorePassword());
        }
        props.put("group.id", this.kafkaSourceConfig.getGroupId());
        props.put("fetch.min.bytes", String.valueOf(this.kafkaSourceConfig.getFetchMinBytes()));
        props.put("enable.auto.commit", String.valueOf(this.kafkaSourceConfig.isAutoCommitEnabled()));
        props.put("auto.commit.interval.ms", String.valueOf(this.kafkaSourceConfig.getAutoCommitIntervalMs()));
        props.put("session.timeout.ms", String.valueOf(this.kafkaSourceConfig.getSessionTimeoutMs()));
        props.put("heartbeat.interval.ms", String.valueOf(this.kafkaSourceConfig.getHeartbeatIntervalMs()));
        props.put("auto.offset.reset", this.kafkaSourceConfig.getAutoOffsetReset());
        props.put("key.deserializer", this.kafkaSourceConfig.getKeyDeserializationClass());
        props.put("value.deserializer", this.kafkaSourceConfig.getValueDeserializationClass());
        this.maxPollIntervalMs = props.containsKey("max.poll.interval.ms") ? Long.parseLong(props.get("max.poll.interval.ms").toString()) : Long.parseLong(ConsumerConfig.configDef().defaultValues().get("max.poll.interval.ms").toString());
        try {
            this.consumer = new KafkaConsumer(this.beforeCreateConsumer(props));
        }
        catch (Exception ex) {
            throw new IllegalArgumentException("Unable to instantiate Kafka consumer", ex);
        }
        this.start();
    }

    protected Properties beforeCreateConsumer(Properties props) {
        return props;
    }

    public void close() throws InterruptedException {
        LOG.info("Stopping kafka source");
        this.running = false;
        if (this.runnerThread != null) {
            this.runnerThread.interrupt();
            this.runnerThread.join();
            this.runnerThread = null;
        }
        if (this.consumer != null) {
            this.consumer.close();
            this.consumer = null;
        }
        LOG.info("Kafka source stopped.");
    }

    public void start() {
        LOG.info("Starting subscribe kafka source on {}", (Object)this.kafkaSourceConfig.getTopic());
        this.consumer.subscribe(Collections.singletonList(this.kafkaSourceConfig.getTopic()));
        this.runnerThread = new Thread(() -> {
            LOG.info("Kafka source started.");
            while (this.running) {
                try {
                    ConsumerRecords consumerRecords = this.consumer.poll(Duration.ofSeconds(1L));
                    CompletableFuture[] futures = new CompletableFuture[consumerRecords.count()];
                    int index = 0;
                    for (ConsumerRecord consumerRecord : consumerRecords) {
                        KafkaRecord<V> record = this.buildRecord((ConsumerRecord<Object, Object>)consumerRecord);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Write record {} {} {}", new Object[]{record.getKey(), record.getValue(), record.getSchema()});
                        }
                        this.consume(record);
                        futures[index] = record.getCompletableFuture();
                        ++index;
                    }
                    if (this.kafkaSourceConfig.isAutoCommitEnabled()) continue;
                    CompletableFuture.allOf(futures).get(this.maxPollIntervalMs * 2L / 3L, TimeUnit.MILLISECONDS);
                    this.consumer.commitSync();
                }
                catch (Exception e) {
                    LOG.error("Error while processing records", (Throwable)e);
                    this.notifyError(e);
                    break;
                }
            }
        });
        this.running = true;
        this.runnerThread.setName("Kafka Source Thread");
        this.runnerThread.start();
    }

    public abstract KafkaRecord<V> buildRecord(ConsumerRecord<Object, Object> var1);

    protected Map<String, String> copyKafkaHeaders(ConsumerRecord<Object, Object> consumerRecord) {
        if (!this.kafkaSourceConfig.isCopyHeadersEnabled()) {
            return Collections.emptyMap();
        }
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put(HEADER_KAFKA_TOPIC_KEY, consumerRecord.topic());
        properties.put(HEADER_KAFKA_PTN_KEY, Integer.toString(consumerRecord.partition()));
        properties.put(HEADER_KAFKA_OFFSET_KEY, Long.toString(consumerRecord.offset()));
        for (Header header : consumerRecord.headers()) {
            properties.put(header.key(), (String)Encoders.BASE64.encode((Object)header.value()));
        }
        return properties;
    }

    protected static class KafkaRecord<V>
    implements Record<V> {
        @Generated
        private static final Logger log = LoggerFactory.getLogger(KafkaRecord.class);
        private final ConsumerRecord<?, ?> record;
        private final V value;
        private final Schema<V> schema;
        private final Map<String, String> properties;
        private final CompletableFuture<Void> completableFuture = new CompletableFuture();

        public KafkaRecord(ConsumerRecord<?, ?> record, V value, Schema<V> schema, Map<String, String> properties) {
            this.record = record;
            this.value = value;
            this.schema = schema;
            this.properties = properties;
        }

        public Optional<String> getPartitionId() {
            return Optional.of(Integer.toString(this.record.partition()));
        }

        public Optional<Integer> getPartitionIndex() {
            return Optional.of(this.record.partition());
        }

        public Optional<Long> getRecordSequence() {
            return Optional.of(this.record.offset());
        }

        public Optional<String> getKey() {
            return Optional.ofNullable(this.record.key() instanceof String ? (String)this.record.key() : null);
        }

        public V getValue() {
            return this.value;
        }

        public void ack() {
            this.completableFuture.complete(null);
        }

        public void fail() {
            this.completableFuture.completeExceptionally(new RuntimeException(String.format("Failed to process record with kafka topic: %s partition: %d offset: %d key: %s", this.record.topic(), this.record.partition(), this.record.offset(), this.getKey())));
        }

        public Schema<V> getSchema() {
            return this.schema;
        }

        public Map<String, String> getProperties() {
            return this.properties;
        }

        @Generated
        public CompletableFuture<Void> getCompletableFuture() {
            return this.completableFuture;
        }
    }

    protected static class KeyValueKafkaRecord<K, W>
    extends KafkaRecord
    implements KVRecord<K, W> {
        private final Schema<K> keySchema;
        private final Schema<W> valueSchema;

        public KeyValueKafkaRecord(ConsumerRecord<?, ?> record, KeyValue<K, W> value, Schema<K> keySchema, Schema<W> valueSchema, Map<String, String> properties) {
            super(record, value, null, properties);
            this.keySchema = keySchema;
            this.valueSchema = valueSchema;
        }

        public Schema<K> getKeySchema() {
            return this.keySchema;
        }

        public Schema<W> getValueSchema() {
            return this.valueSchema;
        }

        public KeyValueEncodingType getKeyValueEncodingType() {
            return KeyValueEncodingType.SEPARATED;
        }
    }
}

