/*
 * Decompiled with CFR 0.152.
 */
package com.github.charithe.kafka;

import com.github.charithe.kafka.EphemeralKafkaBroker;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaHelper {
    private final EphemeralKafkaBroker broker;

    public static KafkaHelper createFor(EphemeralKafkaBroker broker) {
        return new KafkaHelper(broker);
    }

    KafkaHelper(EphemeralKafkaBroker broker) {
        this.broker = broker;
    }

    public Properties producerConfig() {
        return this.broker.producerConfig();
    }

    public Properties consumerConfig() {
        return this.broker.consumerConfig();
    }

    public Properties consumerConfig(boolean enableAutoCommit) {
        return this.broker.consumerConfig(enableAutoCommit);
    }

    public String zookeeperConnectionString() {
        return this.broker.getZookeeperConnectString().orElseThrow(() -> new IllegalStateException("KafkaBroker is not running"));
    }

    public int zookeeperPort() {
        return this.broker.getZookeeperPort().orElseThrow(() -> new IllegalStateException("KafkaBroker is not running"));
    }

    public int kafkaPort() {
        return this.broker.getKafkaPort().orElseThrow(() -> new IllegalStateException("KafkaBroker is not running"));
    }

    public <K, V> KafkaProducer<K, V> createProducer(Serializer<K> keySerializer, Serializer<V> valueSerializer, Properties overrideConfig) {
        return this.broker.createProducer(keySerializer, valueSerializer, overrideConfig);
    }

    public KafkaProducer<String, String> createStringProducer() {
        return this.createProducer((Serializer)new StringSerializer(), (Serializer)new StringSerializer(), null);
    }

    public KafkaProducer<String, String> createStringProducer(Properties overrideConfig) {
        return this.createProducer((Serializer)new StringSerializer(), (Serializer)new StringSerializer(), overrideConfig);
    }

    public KafkaProducer<byte[], byte[]> createByteProducer() {
        return this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer(), null);
    }

    public KafkaProducer<byte[], byte[]> createByteProducer(Properties overrideConfig) {
        return this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer(), overrideConfig);
    }

    public <K, V> void produce(String topic, KafkaProducer<K, V> producer, Map<K, V> data) {
        data.forEach((k, v) -> producer.send(new ProducerRecord(topic, k, v)));
        producer.flush();
    }

    public void produceStrings(String topic, String ... values) {
        try (KafkaProducer<String, String> producer = this.createStringProducer();){
            Map data = Arrays.stream(values).collect(Collectors.toMap(k -> String.valueOf(k.hashCode()), Function.identity()));
            this.produce(topic, producer, data);
        }
    }

    public <K, V> KafkaConsumer<K, V> createConsumer(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Properties overrideConfig) {
        return this.broker.createConsumer(keyDeserializer, valueDeserializer, overrideConfig);
    }

    public KafkaConsumer<String, String> createStringConsumer() {
        return this.createConsumer((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), null);
    }

    public KafkaConsumer<String, String> createStringConsumer(Properties overrideConfig) {
        return this.createConsumer((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), overrideConfig);
    }

    public KafkaConsumer<byte[], byte[]> createByteConsumer() {
        return this.createConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), null);
    }

    public KafkaConsumer<byte[], byte[]> createByteConsumer(Properties overrideConfig) {
        return this.createConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), overrideConfig);
    }

    public <K, V> ListenableFuture<List<ConsumerRecord<K, V>>> consume(String topic, KafkaConsumer<K, V> consumer, int numMessagesToConsume) {
        consumer.subscribe((Collection)Lists.newArrayList((Object[])new String[]{topic}));
        ListeningExecutorService executor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newSingleThreadExecutor());
        return executor.submit(new RecordConsumer<K, V>(numMessagesToConsume, consumer));
    }

    public ListenableFuture<List<String>> consumeStrings(String topic, int numMessagesToConsume) {
        KafkaConsumer<String, String> consumer = this.createStringConsumer();
        ListenableFuture<List<ConsumerRecord<String, String>>> records = this.consume(topic, consumer, numMessagesToConsume);
        return Futures.transform(records, this::extractValues);
    }

    private List<String> extractValues(List<ConsumerRecord<String, String>> records) {
        if (records == null) {
            return Collections.emptyList();
        }
        return records.stream().map(ConsumerRecord::value).collect(Collectors.toList());
    }

    public static class RecordConsumer<K, V>
    implements Callable<List<ConsumerRecord<K, V>>> {
        private final int numRecordsToPoll;
        private final KafkaConsumer<K, V> consumer;

        RecordConsumer(int numRecordsToPoll, KafkaConsumer<K, V> consumer) {
            this.numRecordsToPoll = numRecordsToPoll;
            this.consumer = consumer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public List<ConsumerRecord<K, V>> call() throws Exception {
            try {
                HashMap commitBuffer = Maps.newHashMap();
                ArrayList<ConsumerRecord<K, V>> polledMessages = new ArrayList<ConsumerRecord<K, V>>(this.numRecordsToPoll);
                block3: while (polledMessages.size() < this.numRecordsToPoll && !Thread.currentThread().isInterrupted()) {
                    ConsumerRecords records = this.consumer.poll(0L);
                    for (ConsumerRecord rec : records) {
                        polledMessages.add(rec);
                        commitBuffer.put(new TopicPartition(rec.topic(), rec.partition()), new OffsetAndMetadata(rec.offset() + 1L));
                        if (polledMessages.size() != this.numRecordsToPoll) continue;
                        this.consumer.commitSync((Map)commitBuffer);
                        continue block3;
                    }
                }
                ArrayList<ConsumerRecord<K, V>> arrayList = polledMessages;
                return arrayList;
            }
            finally {
                this.consumer.close();
            }
        }
    }
}

