/*
 * Decompiled with CFR 0.152.
 */
package io.apicurio.registry.utils.kafka;

import io.apicurio.registry.utils.kafka.ConsumerActions;
import io.apicurio.registry.utils.kafka.Oneof2;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerContainer<K, V>
implements ConsumerActions<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerContainer.class);
    public static final long DEFAULT_CONSUMER_POLL_TIMEOUT = TimeUnit.SECONDS.toMillis(1L);
    private static final long MIN_RETRY_DELAY = 100L;
    private static final long MAX_RETRY_DELAY = 10000L;
    private static final AtomicInteger containerCount = new AtomicInteger();
    private final Object lock = new Object();
    private final Properties consumerProperties;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final Duration consumerPollTimeout;
    private final java.util.function.Consumer<? super ConsumerRecord<K, V>> recordHandler;
    private final java.util.function.Consumer<? super ConsumerRecords<K, V>> recordsHandler;
    private final BiConsumer<? super Consumer<?, ?>, ? super RuntimeException> consumerExceptionHandler;
    private final long idlePingTimeout;
    private final java.util.function.Consumer<? super TopicPartition> idlePingHandler;
    private final Thread thread;
    private final BlockingQueue<CompletableFuture<Consumer<K, V>>> tasks = new LinkedTransferQueue<CompletableFuture<Consumer<K, V>>>();
    private volatile boolean closed;

    public ConsumerContainer(Properties consumerProperties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Oneof2<java.util.function.Consumer<? super ConsumerRecord<K, V>>, java.util.function.Consumer<? super ConsumerRecords<K, V>>> recordOrRecordsHandler, BiConsumer<? super Consumer<?, ?>, ? super RuntimeException> consumerExceptionHandler) {
        this(consumerProperties, keyDeserializer, valueDeserializer, DEFAULT_CONSUMER_POLL_TIMEOUT, recordOrRecordsHandler, consumerExceptionHandler, 0L, null);
    }

    public ConsumerContainer(Properties consumerProperties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, long consumerPollTimeout, Oneof2<java.util.function.Consumer<? super ConsumerRecord<K, V>>, java.util.function.Consumer<? super ConsumerRecords<K, V>>> recordOrRecordsHandler, BiConsumer<? super Consumer<?, ?>, ? super RuntimeException> consumerExceptionHandler, long idlePingTimeout, java.util.function.Consumer<? super TopicPartition> idlePingHandler) {
        this.consumerProperties = Objects.requireNonNull(consumerProperties);
        this.keyDeserializer = Objects.requireNonNull(keyDeserializer);
        this.valueDeserializer = Objects.requireNonNull(valueDeserializer);
        this.consumerPollTimeout = Duration.ofMillis(consumerPollTimeout);
        this.recordHandler = recordOrRecordsHandler.isFirst() ? recordOrRecordsHandler.getFirst() : null;
        this.recordsHandler = recordOrRecordsHandler.isSecond() ? recordOrRecordsHandler.getSecond() : null;
        this.consumerExceptionHandler = Objects.requireNonNull(consumerExceptionHandler);
        this.idlePingTimeout = idlePingTimeout;
        this.idlePingHandler = idlePingHandler;
        this.thread = new Thread(this::consumerLoop, "kafka-consumer-container-" + containerCount.incrementAndGet());
        this.thread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final <R> CompletableFuture<R> submit(Function<? super Consumer<K, V>, ? extends R> consumerAction) {
        Objects.requireNonNull(consumerAction);
        if (Thread.currentThread() == this.thread) {
            throw new IllegalStateException("Don't submit actions from consumer thread");
        }
        Object object = this.lock;
        synchronized (object) {
            CompletableFuture consumerTask = new CompletableFuture();
            CompletionStage actionTask = consumerTask.thenApply(consumerAction);
            if (this.closed) {
                consumerTask.completeExceptionally(new IllegalStateException("Container already closed"));
            } else {
                this.tasks.add(consumerTask);
            }
            return actionTask;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void consumerLoop() {
        boolean waitingForSubscriptionOrAssignment = false;
        HashMap<TopicPartition, Long> activeTopics = this.idlePingHandler == null ? null : new HashMap<TopicPartition, Long>();
        try (KafkaConsumer consumer = new KafkaConsumer(this.consumerProperties, this.keyDeserializer, this.valueDeserializer);){
            while (!this.closed) {
                CompletableFuture task;
                boolean interrupted = false;
                try {
                    task = waitingForSubscriptionOrAssignment ? this.tasks.take() : (CompletableFuture)this.tasks.poll();
                }
                catch (InterruptedException e) {
                    log.warn("Consumer thread interrupted", (Throwable)e);
                    task = null;
                    interrupted = true;
                }
                if (task != null) {
                    task.complete(consumer);
                    if (!waitingForSubscriptionOrAssignment) continue;
                    waitingForSubscriptionOrAssignment = consumer.subscription().isEmpty() && consumer.assignment().isEmpty();
                    continue;
                }
                if (interrupted) continue;
                assert (!waitingForSubscriptionOrAssignment);
                ConsumerRecords records = null;
                try {
                    records = consumer.poll(this.consumerPollTimeout);
                }
                catch (IllegalStateException e) {
                    log.info("{} - will wait", (Object)e.getMessage());
                    waitingForSubscriptionOrAssignment = true;
                }
                catch (RuntimeException e) {
                    this.consumerExceptionHandler.accept((Consumer<?, ?>)consumer, (RuntimeException)((RuntimeException)e));
                }
                Long time = System.currentTimeMillis();
                if (records != null) {
                    if (activeTopics != null) {
                        for (TopicPartition partition : records.partitions()) {
                            activeTopics.put(partition, time);
                        }
                    }
                    this.handleRecords((ConsumerRecords<K, V>)records, (Consumer<K, V>)consumer);
                }
                if (activeTopics == null) continue;
                ArrayList idlePartitions = null;
                for (Map.Entry entry : activeTopics.entrySet()) {
                    if (time - (Long)entry.getValue() < this.idlePingTimeout) continue;
                    if (idlePartitions == null) {
                        idlePartitions = new ArrayList(4);
                    }
                    idlePartitions.add(entry.getKey());
                    entry.setValue(time);
                }
                if (idlePartitions == null) continue;
                idlePartitions.forEach(this.idlePingHandler);
                activeTopics.keySet().retainAll(consumer.assignment());
            }
            log.info("Consumer loop finished");
        }
        catch (Throwable e) {
            log.warn("Exception caught in consumer polling thread", e);
        }
        finally {
            log.info("Consumer thread terminating");
        }
    }

    private void handleRecords(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
        if (this.recordsHandler != null) {
            this.acceptRetryable(records, this.recordsHandler, consumer);
        } else {
            assert (this.recordHandler != null);
            for (ConsumerRecord record : records) {
                this.acceptRetryable(record, this.recordHandler, consumer);
            }
        }
    }

    private <T> void acceptRetryable(T record, java.util.function.Consumer<? super T> handler, Consumer<K, V> consumer) {
        this.applyRetryable(record, r -> {
            handler.accept(r);
            return null;
        }, consumer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T, R> R applyRetryable(T record, Function<? super T, ? extends R> handler, Consumer<K, V> consumer) {
        long delay = 100L;
        boolean interrupted = false;
        while (!this.closed) {
            try {
                R r = handler.apply(record);
                return r;
            }
            catch (Exception e) {
                log.warn("Exception caught while processing {} - retrying in {} ms", new Object[]{ConsumerContainer.formatRecord(record), delay, e});
                try {
                    CompletableFuture<Consumer<K, V>> task;
                    while (!this.closed && (task = this.tasks.poll(delay, TimeUnit.MILLISECONDS)) != null) {
                        task.complete(consumer);
                    }
                }
                catch (InterruptedException ie) {
                    log.info("Interrupted - keeping retrying");
                    interrupted = true;
                }
                delay = Math.min(delay * 2L, 10000L);
            }
            catch (Throwable e) {
                try {
                    log.error("Error caught while processing {} - exiting JVM", (Object)ConsumerContainer.formatRecord(record), (Object)e);
                }
                finally {
                    System.exit(-1);
                }
            }
            finally {
                if (!interrupted) continue;
                Thread.currentThread().interrupt();
            }
        }
        return null;
    }

    private static String formatRecord(Object record) {
        if (record instanceof ConsumerRecord) {
            ConsumerRecord cr = (ConsumerRecord)record;
            return String.format("message from topic-partition %s-%s, offset %d, timestamp %tc", cr.topic(), cr.partition(), cr.offset(), cr.timestamp());
        }
        if (record instanceof ConsumerRecords) {
            ConsumerRecords crs = (ConsumerRecords)record;
            return String.format("%d messages from topic-partitions %s", crs.count(), crs.partitions());
        }
        return String.valueOf(record);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            this.submit(consumer -> {
                this.closed = true;
                return null;
            }).join();
        }
    }

    public static class DynamicPool<K, V>
    implements AutoCloseable {
        private final Properties consumerProperties;
        private final Deserializer<K> keyDeserializer;
        private final Deserializer<V> valueDeserializer;
        private final String topic;
        private final Oneof2<java.util.function.Consumer<? super ConsumerRecord<K, V>>, java.util.function.Consumer<? super ConsumerRecords<K, V>>> recordOrRecordsHandler;
        private final BiConsumer<? super Consumer<?, ?>, ? super RuntimeException> consumerExceptionHandler;
        private final LinkedList<ConsumerContainer<K, V>> containers = new LinkedList();
        private volatile boolean closed;

        public DynamicPool(Properties consumerProperties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String topic, int initialConsumerThreads, Oneof2<java.util.function.Consumer<? super ConsumerRecord<K, V>>, java.util.function.Consumer<? super ConsumerRecords<K, V>>> recordOrRecordsHandler, BiConsumer<? super Consumer<?, ?>, ? super RuntimeException> consumerExceptionHandler) {
            this.consumerProperties = Objects.requireNonNull(consumerProperties);
            this.keyDeserializer = Objects.requireNonNull(keyDeserializer);
            this.valueDeserializer = Objects.requireNonNull(valueDeserializer);
            this.topic = Objects.requireNonNull(topic);
            this.recordOrRecordsHandler = Objects.requireNonNull(recordOrRecordsHandler);
            this.consumerExceptionHandler = Objects.requireNonNull(consumerExceptionHandler);
            this.setConsumerThreads(initialConsumerThreads);
        }

        public synchronized int getConsumerThreads() {
            return this.containers.size();
        }

        public synchronized void setConsumerThreads(int consumerThreads) {
            ConsumerContainer<K, V> container;
            this.checkNotClosed();
            if (consumerThreads < 0) {
                throw new IllegalArgumentException("consumerThreads should be non-negative");
            }
            while (this.containers.size() > consumerThreads) {
                container = this.containers.removeLast();
                container.close();
            }
            while (this.containers.size() < consumerThreads) {
                container = new ConsumerContainer<K, V>(this.consumerProperties, this.keyDeserializer, this.valueDeserializer, this.recordOrRecordsHandler, this.consumerExceptionHandler);
                container.submit(consumer -> {
                    consumer.subscribe(Collections.singletonList(this.topic));
                    return null;
                });
                this.containers.addLast(container);
            }
        }

        private void checkNotClosed() {
            if (this.closed) {
                throw new IllegalStateException("Container already closed");
            }
        }

        @Override
        public synchronized void close() {
            if (this.closed) {
                return;
            }
            try {
                this.containers.forEach(ConsumerContainer::close);
            }
            finally {
                this.closed = true;
            }
        }
    }
}

