/*
 * Decompiled with CFR 0.152.
 */
package io.datanerds.verteiler;

import com.google.common.util.concurrent.MoreExecutors;
import io.datanerds.verteiler.ConsumerRecordRelay;
import io.datanerds.verteiler.KafkaConfigValidator;
import io.datanerds.verteiler.Processor;
import io.datanerds.verteiler.exception.ConsumerException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlockingQueueConsumer<K, V>
implements ConsumerRebalanceListener {
    private static final Logger logger = LoggerFactory.getLogger(BlockingQueueConsumer.class);
    private final Properties kafkaConfig;
    private final String topic;
    private final int queueSize;
    private final java.util.function.Consumer<V> action;
    private final Map<Integer, Processor<K, V>> processors = new ConcurrentHashMap<Integer, Processor<K, V>>();
    private final ExecutorService pool;
    private final Consumer<K, V> consumer;
    private final Object lock = new Object();
    private volatile ConsumerRecordRelay<K, V> relay;

    public BlockingQueueConsumer(String topic, Properties kafkaConfig, int queueSize, java.util.function.Consumer<V> action) {
        this.topic = topic;
        KafkaConfigValidator.validate(kafkaConfig);
        this.kafkaConfig = kafkaConfig;
        this.action = action;
        this.queueSize = queueSize;
        this.pool = Executors.newCachedThreadPool();
        this.consumer = this.createKafkaConsumer();
        this.consumer.subscribe(Arrays.asList(topic), (ConsumerRebalanceListener)this);
        Set partitions = this.consumer.assignment();
        partitions.forEach(this::createProcessor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        Object object = this.lock;
        synchronized (object) {
            if (this.relay != null) {
                throw new IllegalStateException("Consumer already started");
            }
            this.relay = new ConsumerRecordRelay<K, V>(this.consumer, this);
            new Thread(this.relay).start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        Object object = this.lock;
        synchronized (object) {
            if (this.relay == null) {
                throw new IllegalStateException("Consumer not started, nothing to stop");
            }
            this.relay.stop();
            if (!MoreExecutors.shutdownAndAwaitTermination((ExecutorService)this.pool, (long)10L, (TimeUnit)TimeUnit.SECONDS)) {
                logger.error("Pool was not terminated properly.");
            }
        }
    }

    void relay(ConsumerRecord<K, V> message) throws InterruptedException {
        if (!this.topic.equals(message.topic())) {
            throw new ConsumerException(String.format("Message from unexpected topic: '%s'", message.topic()));
        }
        Processor<K, V> processor = this.processors.get(message.partition());
        processor.queue(message);
    }

    private void createProcessor(TopicPartition partition) {
        Processor<K, V> processor = new Processor<K, V>(partition, this.relay, this.action, this.queueSize);
        this.pool.execute(processor);
        this.processors.put(partition.partition(), processor);
    }

    private Consumer<K, V> createKafkaConsumer() {
        this.addClientIdIfNotPresent(this.kafkaConfig);
        this.addDisableAutoCommitIfNotPresent(this.kafkaConfig);
        return new KafkaConsumer(this.kafkaConfig);
    }

    private void addClientIdIfNotPresent(Properties props) {
        if (!props.contains("client.id")) {
            props.put("client.id", this.getClientId());
        }
    }

    private void addDisableAutoCommitIfNotPresent(Properties props) {
        if (!props.contains("enable.auto.commit")) {
            props.put("enable.auto.commit", "false");
        }
    }

    private String getClientId() {
        try {
            return String.format("%s-%s", InetAddress.getLocalHost().getHostName(), this.topic);
        }
        catch (UnknownHostException ex) {
            throw new ConsumerException("Could not retrieve client identifier", ex);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        partitions.forEach(this::createProcessor);
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        partitions.forEach(partition -> {
            Processor<K, V> processor = this.processors.get(partition.partition());
            processor.stop();
            this.processors.remove(partition.partition());
            this.relay.removePartitionFromOffset((TopicPartition)partition);
        });
    }
}

