/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.kafka.connect;

import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.connect.PulsarIOSourceTaskContext;
import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;
import org.apache.pulsar.io.kafka.connect.PulsarOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConnectSource
implements Source<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConnectSource.class);
    private SourceTaskContext sourceTaskContext;
    private SourceTask sourceTask;
    private Converter keyConverter;
    private Converter valueConverter;
    private Iterator<SourceRecord> currentBatch = null;
    private CompletableFuture<Void> flushFuture;
    private OffsetBackingStore offsetStore;
    private OffsetStorageReader offsetReader;
    private OffsetStorageWriter offsetWriter;
    private IdentityHashMap<SourceRecord, SourceRecord> outstandingRecords = new IdentityHashMap();

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        HashMap<String, String> stringConfig = new HashMap<String, String>();
        config.forEach((key, value) -> {
            if (value instanceof String) {
                stringConfig.put((String)key, (String)value);
            }
        });
        this.sourceTask = ((Class)config.get("task.class")).asSubclass(SourceTask.class).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        this.keyConverter = ((Class)config.get("key.converter")).asSubclass(Converter.class).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        this.valueConverter = ((Class)config.get("value.converter")).asSubclass(Converter.class).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        this.offsetStore = new PulsarOffsetBackingStore();
        this.offsetStore.configure((WorkerConfig)new PulsarKafkaWorkerConfig(stringConfig));
        this.offsetStore.start();
        this.offsetReader = new OffsetStorageReaderImpl(this.offsetStore, "pulsar-kafka-connect-adaptor", this.keyConverter, this.valueConverter);
        this.offsetWriter = new OffsetStorageWriter(this.offsetStore, "pulsar-kafka-connect-adaptor", this.keyConverter, this.valueConverter);
        this.sourceTaskContext = new PulsarIOSourceTaskContext(this.offsetReader);
        this.sourceTask.initialize(this.sourceTaskContext);
        this.sourceTask.start(stringConfig);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Record<byte[]> read() throws Exception {
        while (true) {
            boolean hasOutstandingRecords;
            if (this.currentBatch == null) {
                this.flushFuture = new CompletableFuture();
                this.currentBatch = this.sourceTask.poll().iterator();
            }
            if (this.currentBatch.hasNext()) {
                return this.processSourceRecord(this.currentBatch.next());
            }
            KafkaConnectSource kafkaConnectSource = this;
            synchronized (kafkaConnectSource) {
                hasOutstandingRecords = !this.outstandingRecords.isEmpty();
            }
            if (hasOutstandingRecords) {
                this.flushFuture.get();
                this.flushFuture = null;
            }
            this.currentBatch = null;
        }
    }

    private synchronized Record<byte[]> processSourceRecord(final SourceRecord srcRecord) {
        this.outstandingRecords.put(srcRecord, srcRecord);
        this.offsetWriter.offset(srcRecord.sourcePartition(), srcRecord.sourceOffset());
        return new Record<byte[]>(){

            public Optional<String> getKey() {
                byte[] keyBytes = KafkaConnectSource.this.keyConverter.fromConnectData(srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
                return Optional.of(Base64.getEncoder().encodeToString(keyBytes));
            }

            public byte[] getValue() {
                return KafkaConnectSource.this.valueConverter.fromConnectData(srcRecord.topic(), srcRecord.valueSchema(), srcRecord.value());
            }

            public Optional<String> getTopicName() {
                return Optional.of(srcRecord.topic());
            }

            public Optional<Long> getEventTime() {
                return Optional.of(srcRecord.timestamp());
            }

            public Optional<String> getPartitionId() {
                String partitionId = srcRecord.sourcePartition().entrySet().stream().map(e -> (String)e.getKey() + "=" + e.getValue()).collect(Collectors.joining(","));
                return Optional.of(partitionId);
            }

            public Optional<Long> getRecordSequence() {
                return Optional.empty();
            }

            public Map<String, String> getProperties() {
                return Collections.emptyMap();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void ack() {
                boolean canComplete;
                KafkaConnectSource kafkaConnectSource = KafkaConnectSource.this;
                synchronized (kafkaConnectSource) {
                    KafkaConnectSource.this.outstandingRecords.remove(srcRecord);
                    canComplete = KafkaConnectSource.this.outstandingRecords.isEmpty();
                }
                if (canComplete && KafkaConnectSource.this.flushFuture != null) {
                    KafkaConnectSource.this.flushFuture.complete(null);
                }
            }

            public void fail() {
                if (KafkaConnectSource.this.flushFuture != null) {
                    KafkaConnectSource.this.flushFuture.completeExceptionally(new Exception("Sink Error"));
                }
            }
        };
    }

    public void close() throws Exception {
        this.sourceTask.stop();
    }

    public SourceTask getSourceTask() {
        return this.sourceTask;
    }

    public OffsetStorageWriter getOffsetWriter() {
        return this.offsetWriter;
    }
}

