/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.kafka.connect;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource;
import org.apache.pulsar.io.kafka.connect.schema.KafkaSchemaWrappedSchema;
import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConnectSource
extends AbstractKafkaConnectSource<KeyValue<byte[], byte[]>> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConnectSource.class);
    private final Cache<org.apache.kafka.connect.data.Schema, KafkaSchemaWrappedSchema> readerCache = CacheBuilder.newBuilder().maximumSize(10000L).expireAfterAccess(30L, TimeUnit.MINUTES).build();
    private boolean jsonWithEnvelope = false;
    private static final String JSON_WITH_ENVELOPE_CONFIG = "json-with-envelope";
    private static Map<String, String> PROPERTIES = Collections.emptyMap();
    private static Optional<Long> RECORD_SEQUENCE = Optional.empty();
    private static long FLUSH_TIMEOUT_MS = 2000L;

    @Override
    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        if (config.get(JSON_WITH_ENVELOPE_CONFIG) != null) {
            this.jsonWithEnvelope = Boolean.parseBoolean(config.get(JSON_WITH_ENVELOPE_CONFIG).toString());
            config.put("schemas.enable", this.jsonWithEnvelope);
        } else {
            config.put("schemas.enable", false);
        }
        log.info("jsonWithEnvelope: {}", (Object)this.jsonWithEnvelope);
        super.open(config, sourceContext);
    }

    public synchronized KafkaSourceRecord processSourceRecord(SourceRecord srcRecord) {
        KafkaSourceRecord record = new KafkaSourceRecord(srcRecord);
        this.offsetWriter.offset(srcRecord.sourcePartition(), srcRecord.sourceOffset());
        return record;
    }

    private class KafkaSourceRecord
    extends AbstractKafkaConnectSource.AbstractKafkaSourceRecord<KeyValue<byte[], byte[]>>
    implements KVRecord<byte[], byte[]> {
        KafkaSourceRecord(SourceRecord srcRecord) {
            super(srcRecord);
            AvroData avroData = new AvroData(1000);
            byte[] keyBytes = KafkaConnectSource.this.keyConverter.fromConnectData(srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
            this.key = keyBytes != null ? Optional.of(Base64.getEncoder().encodeToString(keyBytes)) : Optional.empty();
            byte[] valueBytes = KafkaConnectSource.this.valueConverter.fromConnectData(srcRecord.topic(), srcRecord.valueSchema(), srcRecord.value());
            this.value = new KeyValue((Object)keyBytes, (Object)valueBytes);
            this.topicName = Optional.of(srcRecord.topic());
            if (srcRecord.keySchema() != null) {
                this.keySchema = (KafkaSchemaWrappedSchema)KafkaConnectSource.this.readerCache.getIfPresent((Object)srcRecord.keySchema());
            }
            if (srcRecord.valueSchema() != null) {
                this.valueSchema = (KafkaSchemaWrappedSchema)KafkaConnectSource.this.readerCache.getIfPresent((Object)srcRecord.valueSchema());
            }
            if (srcRecord.keySchema() != null && this.keySchema == null) {
                this.keySchema = new KafkaSchemaWrappedSchema(avroData.fromConnectSchema(srcRecord.keySchema()), KafkaConnectSource.this.keyConverter);
                KafkaConnectSource.this.readerCache.put((Object)srcRecord.keySchema(), (Object)this.keySchema);
            }
            if (srcRecord.valueSchema() != null && this.valueSchema == null) {
                this.valueSchema = new KafkaSchemaWrappedSchema(avroData.fromConnectSchema(srcRecord.valueSchema()), KafkaConnectSource.this.valueConverter);
                KafkaConnectSource.this.readerCache.put((Object)srcRecord.valueSchema(), (Object)this.valueSchema);
            }
            this.eventTime = Optional.ofNullable(srcRecord.timestamp());
            this.partitionId = Optional.of(srcRecord.sourcePartition().entrySet().stream().map(e -> (String)e.getKey() + "=" + e.getValue()).collect(Collectors.joining(",")));
            this.partitionIndex = Optional.ofNullable(srcRecord.kafkaPartition());
        }

        @Override
        public boolean isEmpty() {
            return ((KeyValue)this.value).getValue() == null;
        }

        public Schema<byte[]> getKeySchema() {
            if (KafkaConnectSource.this.jsonWithEnvelope || this.keySchema == null) {
                return Schema.BYTES;
            }
            return this.keySchema;
        }

        public Schema<byte[]> getValueSchema() {
            if (KafkaConnectSource.this.jsonWithEnvelope || this.valueSchema == null) {
                return Schema.BYTES;
            }
            return this.valueSchema;
        }

        public KeyValueEncodingType getKeyValueEncodingType() {
            if (KafkaConnectSource.this.jsonWithEnvelope) {
                return KeyValueEncodingType.INLINE;
            }
            return KeyValueEncodingType.SEPARATED;
        }
    }
}

