/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.kafka.connect;

import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.connect.PulsarIOSourceTaskContext;
import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;
import org.apache.pulsar.io.kafka.connect.PulsarOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConnectSource
implements Source<KeyValue<byte[], byte[]>> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConnectSource.class);
    private SourceTaskContext sourceTaskContext;
    private SourceTask sourceTask;
    private Converter keyConverter;
    private Converter valueConverter;
    private Iterator<SourceRecord> currentBatch = null;
    private CompletableFuture<Void> flushFuture;
    private OffsetBackingStore offsetStore;
    private OffsetStorageReader offsetReader;
    private String topicNamespace;
    private OffsetStorageWriter offsetWriter;
    private AtomicInteger outstandingRecords = new AtomicInteger(0);
    private static Map<String, String> PROPERTIES = Collections.emptyMap();
    private static Optional<Long> RECORD_SEQUENCE = Optional.empty();
    private static long FLUSH_TIMEOUT_MS = 2000L;

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        HashMap<String, String> stringConfig = new HashMap<String, String>();
        config.forEach((key, value) -> {
            if (value instanceof String) {
                stringConfig.put((String)key, (String)value);
            }
        });
        this.sourceTask = Class.forName((String)stringConfig.get("task.class")).asSubclass(SourceTask.class).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        this.topicNamespace = (String)stringConfig.get("topic.namespace");
        this.keyConverter = Class.forName((String)stringConfig.get("key.converter")).asSubclass(Converter.class).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        this.valueConverter = Class.forName((String)stringConfig.get("value.converter")).asSubclass(Converter.class).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        this.keyConverter.configure(config, true);
        this.valueConverter.configure(config, false);
        this.offsetStore = new PulsarOffsetBackingStore();
        this.offsetStore.configure((WorkerConfig)new PulsarKafkaWorkerConfig(stringConfig));
        this.offsetStore.start();
        this.offsetReader = new OffsetStorageReaderImpl(this.offsetStore, "pulsar-kafka-connect-adaptor", this.keyConverter, this.valueConverter);
        this.offsetWriter = new OffsetStorageWriter(this.offsetStore, "pulsar-kafka-connect-adaptor", this.keyConverter, this.valueConverter);
        this.sourceTaskContext = new PulsarIOSourceTaskContext(this.offsetReader);
        this.sourceTask.initialize(this.sourceTaskContext);
        this.sourceTask.start(stringConfig);
    }

    public synchronized Record<KeyValue<byte[], byte[]>> read() throws Exception {
        while (true) {
            if (this.currentBatch == null) {
                this.flushFuture = new CompletableFuture();
                List recordList = this.sourceTask.poll();
                if (recordList == null || recordList.isEmpty()) {
                    Thread.sleep(1000L);
                    continue;
                }
                this.outstandingRecords.addAndGet(recordList.size());
                this.currentBatch = recordList.iterator();
            }
            if (this.currentBatch.hasNext()) {
                return this.processSourceRecord(this.currentBatch.next());
            }
            this.flushFuture.get();
            this.flushFuture = null;
            this.currentBatch = null;
        }
    }

    public void close() {
        this.sourceTask.stop();
    }

    private synchronized Record<KeyValue<byte[], byte[]>> processSourceRecord(SourceRecord srcRecord) {
        KafkaSourceRecord record = new KafkaSourceRecord(srcRecord);
        this.offsetWriter.offset(srcRecord.sourcePartition(), srcRecord.sourceOffset());
        return record;
    }

    public SourceTask getSourceTask() {
        return this.sourceTask;
    }

    public OffsetStorageWriter getOffsetWriter() {
        return this.offsetWriter;
    }

    private class KafkaSourceRecord
    implements Record<KeyValue<byte[], byte[]>> {
        Optional<String> key;
        KeyValue<byte[], byte[]> value;
        Optional<String> topicName;
        Optional<Long> eventTime;
        Optional<String> partitionId;
        Optional<String> destinationTopic;

        KafkaSourceRecord(SourceRecord srcRecord) {
            byte[] keyBytes = KafkaConnectSource.this.keyConverter.fromConnectData(srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
            byte[] valueBytes = KafkaConnectSource.this.valueConverter.fromConnectData(srcRecord.topic(), srcRecord.valueSchema(), srcRecord.value());
            if (keyBytes != null) {
                this.key = Optional.of(Base64.getEncoder().encodeToString(keyBytes));
            }
            this.value = new KeyValue((Object)keyBytes, (Object)valueBytes);
            this.topicName = Optional.of(srcRecord.topic());
            this.eventTime = Optional.ofNullable(srcRecord.timestamp());
            this.partitionId = Optional.of(srcRecord.sourcePartition().entrySet().stream().map(e -> (String)e.getKey() + "=" + e.getValue()).collect(Collectors.joining(",")));
            this.destinationTopic = Optional.of(KafkaConnectSource.this.topicNamespace + "/" + srcRecord.topic());
        }

        public Optional<Long> getRecordSequence() {
            return RECORD_SEQUENCE;
        }

        public Map<String, String> getProperties() {
            return PROPERTIES;
        }

        private void completedFlushOffset(Throwable error, Void result) {
            if (error != null) {
                log.error("Failed to flush offsets to storage: ", error);
                KafkaConnectSource.this.currentBatch = null;
                KafkaConnectSource.this.offsetWriter.cancelFlush();
                KafkaConnectSource.this.flushFuture.completeExceptionally(new Exception("No Offsets Added Error"));
            } else {
                log.trace("Finished flushing offsets to storage");
                KafkaConnectSource.this.currentBatch = null;
                KafkaConnectSource.this.flushFuture.complete(null);
            }
        }

        public void ack() {
            boolean canFlush;
            boolean bl = canFlush = KafkaConnectSource.this.outstandingRecords.decrementAndGet() == 0;
            if (canFlush && KafkaConnectSource.this.flushFuture != null) {
                if (!KafkaConnectSource.this.offsetWriter.beginFlush()) {
                    log.error("When beginFlush, No offsets to commit!");
                    KafkaConnectSource.this.flushFuture.completeExceptionally(new Exception("No Offsets Added Error when beginFlush"));
                    return;
                }
                Future doFlush = KafkaConnectSource.this.offsetWriter.doFlush(this::completedFlushOffset);
                if (doFlush == null) {
                    log.error("No offsets to commit!");
                    KafkaConnectSource.this.flushFuture.completeExceptionally(new Exception("No Offsets Added Error"));
                    return;
                }
                try {
                    doFlush.get(FLUSH_TIMEOUT_MS, TimeUnit.MILLISECONDS);
                    KafkaConnectSource.this.sourceTask.commit();
                }
                catch (InterruptedException e) {
                    log.warn("Flush of {} offsets interrupted, cancelling", (Object)this);
                    KafkaConnectSource.this.offsetWriter.cancelFlush();
                }
                catch (ExecutionException e) {
                    log.error("Flush of {} offsets threw an unexpected exception: ", (Object)this, (Object)e);
                    KafkaConnectSource.this.offsetWriter.cancelFlush();
                }
                catch (TimeoutException e) {
                    log.error("Timed out waiting to flush {} offsets to storage", (Object)this);
                    KafkaConnectSource.this.offsetWriter.cancelFlush();
                }
            }
        }

        public void fail() {
            if (KafkaConnectSource.this.flushFuture != null) {
                KafkaConnectSource.this.flushFuture.completeExceptionally(new Exception("Sink Error"));
            }
        }

        public Optional<String> getKey() {
            return this.key;
        }

        public KeyValue<byte[], byte[]> getValue() {
            return this.value;
        }

        public Optional<String> getTopicName() {
            return this.topicName;
        }

        public Optional<Long> getEventTime() {
            return this.eventTime;
        }

        public Optional<String> getPartitionId() {
            return this.partitionId;
        }

        public Optional<String> getDestinationTopic() {
            return this.destinationTopic;
        }
    }
}

