/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.kafka.client.consumer.impl;

import io.vertx.core.Completable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.kafka.client.common.KafkaClientOptions;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.common.tracing.ConsumerTracer;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

public class KafkaReadStreamImpl<K, V>
implements KafkaReadStream<K, V> {
    private static final AtomicInteger threadCount = new AtomicInteger(0);
    private final Context context;
    private final AtomicBoolean closed = new AtomicBoolean(true);
    private final Consumer<K, V> consumer;
    private final ConsumerTracer tracer;
    private final AtomicBoolean consuming = new AtomicBoolean(false);
    private final AtomicLong demand = new AtomicLong(Long.MAX_VALUE);
    private final AtomicBoolean polling = new AtomicBoolean(false);
    private Handler<ConsumerRecord<K, V>> recordHandler;
    private Handler<Throwable> exceptionHandler;
    private Iterator<ConsumerRecord<K, V>> current;
    private Handler<ConsumerRecords<K, V>> batchHandler;
    private Handler<Set<TopicPartition>> partitionsRevokedHandler;
    private Handler<Set<TopicPartition>> partitionsAssignedHandler;
    private Duration pollTimeout = Duration.ofSeconds(1L);
    private ExecutorService worker;
    private final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener(){

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            Handler<Set<TopicPartition>> handler = KafkaReadStreamImpl.this.partitionsRevokedHandler;
            if (handler != null) {
                KafkaReadStreamImpl.this.context.runOnContext(v -> handler.handle(Helper.toSet(partitions)));
            }
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            Handler<Set<TopicPartition>> handler = KafkaReadStreamImpl.this.partitionsAssignedHandler;
            if (handler != null) {
                KafkaReadStreamImpl.this.context.runOnContext(v -> handler.handle(Helper.toSet(partitions)));
            }
        }
    };

    public KafkaReadStreamImpl(Vertx vertx, Consumer<K, V> consumer, KafkaClientOptions options) {
        ContextInternal ctxInt = ((ContextInternal)vertx.getOrCreateContext()).unwrap();
        this.consumer = consumer;
        this.context = ctxInt;
        this.tracer = ConsumerTracer.create(ctxInt.tracer(), options);
    }

    private <T> void start(BiConsumer<Consumer<K, V>, Promise<T>> task, Completable<T> handler) {
        this.worker = Executors.newSingleThreadExecutor(r -> new Thread(r, "vert.x-kafka-consumer-thread-" + threadCount.getAndIncrement()));
        this.submitTaskWhenStarted(task, handler);
    }

    private <T> void submitTaskWhenStarted(BiConsumer<Consumer<K, V>, Promise<T>> task, Completable<T> handler) {
        if (this.worker == null) {
            throw new IllegalStateException();
        }
        this.worker.submit(() -> {
            block4: {
                Promise future = null;
                if (handler != null) {
                    future = Promise.promise();
                    future.future().onComplete((res, err) -> this.context.runOnContext(v -> handler.complete(res, err)));
                }
                try {
                    task.accept(this.consumer, future);
                }
                catch (Exception e) {
                    if (future != null) {
                        future.tryFail((Throwable)e);
                    }
                    if (this.exceptionHandler == null) break block4;
                    this.exceptionHandler.handle((Object)e);
                }
            }
        });
    }

    private void pollRecords(Handler<ConsumerRecords<K, V>> handler) {
        if (this.polling.compareAndSet(false, true)) {
            this.worker.submit(() -> {
                block9: {
                    boolean submitted = false;
                    try {
                        if (this.closed.get()) break block9;
                        try {
                            ConsumerRecords records = this.consumer.poll(this.pollTimeout);
                            if (records != null && records.count() > 0) {
                                submitted = true;
                                this.context.runOnContext(v -> {
                                    this.polling.set(false);
                                    handler.handle((Object)records);
                                });
                            }
                        }
                        catch (WakeupException records) {
                        }
                        catch (Exception e) {
                            if (this.exceptionHandler != null) {
                                this.exceptionHandler.handle((Object)e);
                            }
                        }
                    }
                    finally {
                        if (!submitted) {
                            this.context.runOnContext(v -> {
                                this.polling.set(false);
                                this.schedule(0L);
                            });
                        }
                    }
                }
            });
        }
    }

    private void schedule(long delay) {
        Handler<ConsumerRecord<K, V>> handler = this.recordHandler;
        Handler<ConsumerRecords<K, V>> multiHandler = this.batchHandler;
        if (this.consuming.get() && this.demand.get() > 0L && (handler != null || this.batchHandler != null)) {
            this.context.runOnContext(v1 -> {
                if (delay > 0L) {
                    this.context.owner().setTimer(delay, v2 -> this.run(handler, multiHandler));
                } else {
                    this.run(handler, multiHandler);
                }
            });
        }
    }

    private void run(Handler<ConsumerRecord<K, V>> handler, Handler<ConsumerRecords<K, V>> multiHandler) {
        if (this.closed.get()) {
            return;
        }
        if (this.current == null || !this.current.hasNext()) {
            this.pollRecords(records -> {
                if (records != null && records.count() > 0) {
                    if (handler != null) {
                        this.current = records.iterator();
                    }
                    if (multiHandler != null) {
                        multiHandler.handle(records);
                    }
                    this.schedule(0L);
                } else {
                    this.schedule(1L);
                }
            });
        } else {
            int count = 0;
            block0: while (this.current.hasNext() && count++ < 10) {
                long v2;
                while ((v2 = this.demand.get()) > 0L) {
                    if (v2 != Long.MAX_VALUE && !this.demand.compareAndSet(v2, v2 - 1L)) continue;
                    ConsumerRecord<K, V> next = this.current.next();
                    ContextInternal ctx = ((ContextInternal)this.context).duplicate();
                    ctx.emit(v -> this.tracedHandler((Context)ctx, handler).handle((Object)next));
                    continue block0;
                }
                break block0;
            }
            this.schedule(0L);
        }
    }

    private Handler<ConsumerRecord<K, V>> tracedHandler(Context ctx, Handler<ConsumerRecord<K, V>> handler) {
        return this.tracer == null ? handler : rec -> {
            ConsumerTracer.StartedSpan startedSpan = this.tracer.prepareMessageReceived(ctx, (ConsumerRecord)rec);
            try {
                handler.handle(rec);
                startedSpan.finish(ctx);
            }
            catch (Throwable t) {
                startedSpan.fail(ctx, t);
                throw t;
            }
        };
    }

    protected <T> Future<T> submitTask2(BiConsumer<Consumer<K, V>, Promise<T>> task) {
        Promise promise = Promise.promise();
        this.submitTask(task, (Completable<T>)promise);
        return promise.future();
    }

    protected <T> void submitTask(BiConsumer<Consumer<K, V>, Promise<T>> task, Completable<T> handler) {
        if (this.closed.compareAndSet(true, false)) {
            this.start(task, handler);
        } else {
            this.submitTaskWhenStarted(task, handler);
        }
    }

    @Override
    public Future<Void> pause(Set<TopicPartition> topicPartitions) {
        return this.submitTask2((consumer, future) -> {
            consumer.pause((Collection)topicPartitions);
            if (future != null) {
                future.complete();
            }
        });
    }

    @Override
    public Future<Set<TopicPartition>> paused() {
        return this.submitTask2((consumer, future) -> {
            Set result = consumer.paused();
            if (future != null) {
                future.complete((Object)result);
            }
        });
    }

    @Override
    public Future<Void> resume(Set<TopicPartition> topicPartitions) {
        return this.submitTask2((consumer, future) -> {
            consumer.resume((Collection)topicPartitions);
            if (future != null) {
                future.complete();
            }
        });
    }

    @Override
    public Future<OffsetAndMetadata> committed(TopicPartition topicPartition) {
        return this.submitTask2((consumer, future) -> {
            OffsetAndMetadata result = consumer.committed(topicPartition);
            if (future != null) {
                future.complete((Object)result);
            }
        });
    }

    @Override
    public Future<Void> seekToEnd(Set<TopicPartition> topicPartitions) {
        Promise promise = Promise.promise();
        this.context.runOnContext(r -> {
            this.current = null;
            this.submitTask((consumer, future) -> {
                consumer.seekToEnd((Collection)topicPartitions);
                if (future != null) {
                    future.complete();
                }
            }, (Completable)promise);
        });
        return promise.future();
    }

    @Override
    public Future<Void> seekToBeginning(Set<TopicPartition> topicPartitions) {
        Promise promise = Promise.promise();
        this.context.runOnContext(r -> {
            this.current = null;
            this.submitTask((consumer, future) -> {
                consumer.seekToBeginning((Collection)topicPartitions);
                if (future != null) {
                    future.complete();
                }
            }, (Completable)promise);
        });
        return promise.future();
    }

    @Override
    public Future<Void> seek(TopicPartition topicPartition, long offset) {
        Promise promise = Promise.promise();
        this.context.runOnContext(r -> {
            this.current = null;
            this.submitTask((consumer, future) -> {
                consumer.seek(topicPartition, offset);
                if (future != null) {
                    future.complete();
                }
            }, (Completable)promise);
        });
        return promise.future();
    }

    @Override
    public Future<Void> seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        Promise promise = Promise.promise();
        this.context.runOnContext(r -> {
            this.current = null;
            this.submitTask((consumer, future) -> {
                consumer.seek(topicPartition, offsetAndMetadata);
                if (future != null) {
                    future.complete();
                }
            }, (Completable)promise);
        });
        return promise.future();
    }

    @Override
    public KafkaReadStream<K, V> partitionsRevokedHandler(Handler<Set<TopicPartition>> handler) {
        this.partitionsRevokedHandler = handler;
        return this;
    }

    @Override
    public KafkaReadStream<K, V> partitionsAssignedHandler(Handler<Set<TopicPartition>> handler) {
        this.partitionsAssignedHandler = handler;
        return this;
    }

    @Override
    public Future<Void> subscribe(Set<String> topics) {
        Promise promise = Promise.promise();
        BiConsumer handler = (consumer, future) -> {
            consumer.subscribe((Collection)topics, this.rebalanceListener);
            this.startConsuming();
            if (future != null) {
                future.complete();
            }
        };
        if (this.closed.compareAndSet(true, false)) {
            this.start(handler, (Completable)promise);
        } else {
            this.submitTask(handler, (Completable)promise);
        }
        return promise.future();
    }

    @Override
    public Future<Void> subscribe(Pattern pattern) {
        Promise promise = Promise.promise();
        BiConsumer handler = (consumer, future) -> {
            consumer.subscribe(pattern, this.rebalanceListener);
            this.startConsuming();
            if (future != null) {
                future.complete();
            }
        };
        if (this.closed.compareAndSet(true, false)) {
            this.start(handler, (Completable)promise);
        } else {
            this.submitTask(handler, (Completable)promise);
        }
        return promise.future();
    }

    @Override
    public Future<Void> unsubscribe() {
        return this.submitTask2((consumer, future) -> {
            consumer.unsubscribe();
            if (future != null) {
                future.complete();
            }
        });
    }

    @Override
    public Future<Set<String>> subscription() {
        return this.submitTask2((consumer, future) -> {
            Set subscription = consumer.subscription();
            if (future != null) {
                future.complete((Object)subscription);
            }
        });
    }

    @Override
    public Future<Void> assign(Set<TopicPartition> partitions) {
        Promise promise = Promise.promise();
        BiConsumer handler = (consumer, future) -> {
            consumer.assign((Collection)partitions);
            this.startConsuming();
            if (future != null) {
                future.complete();
            }
        };
        if (this.closed.compareAndSet(true, false)) {
            this.start(handler, (Completable)promise);
        } else {
            this.submitTask(handler, (Completable)promise);
        }
        return promise.future();
    }

    @Override
    public Future<Set<TopicPartition>> assignment() {
        Promise promise = Promise.promise();
        this.submitTask((consumer, future) -> {
            Set partitions = consumer.assignment();
            if (future != null) {
                future.complete((Object)partitions);
            }
        }, (Completable)promise);
        return promise.future();
    }

    @Override
    public Future<Map<String, List<PartitionInfo>>> listTopics() {
        return this.submitTask2((consumer, future) -> {
            Map topics = consumer.listTopics();
            if (future != null) {
                future.complete((Object)topics);
            }
        });
    }

    @Override
    public Future<Map<TopicPartition, OffsetAndMetadata>> commit() {
        return this.commit(null);
    }

    @Override
    public Future<Map<TopicPartition, OffsetAndMetadata>> commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        return this.submitTask2((consumer, future) -> {
            if (offsets == null) {
                consumer.commitSync();
            } else {
                consumer.commitSync(offsets);
            }
            if (future != null) {
                future.complete((Object)offsets);
            }
        });
    }

    @Override
    public Future<List<PartitionInfo>> partitionsFor(String topic) {
        return this.submitTask2((consumer, future) -> {
            List partitions = consumer.partitionsFor(topic);
            if (future != null) {
                future.complete((Object)partitions);
            }
        });
    }

    @Override
    public KafkaReadStreamImpl<K, V> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override
    public KafkaReadStreamImpl<K, V> handler(Handler<ConsumerRecord<K, V>> handler) {
        this.recordHandler = handler;
        this.schedule(0L);
        return this;
    }

    @Override
    public KafkaReadStreamImpl<K, V> pause() {
        this.demand.set(0L);
        return this;
    }

    @Override
    public KafkaReadStreamImpl<K, V> resume() {
        return this.fetch(Long.MAX_VALUE);
    }

    @Override
    public KafkaReadStreamImpl<K, V> fetch(long amount) {
        if (amount < 0L) {
            throw new IllegalArgumentException("Invalid claim " + amount);
        }
        long op = this.demand.updateAndGet(val -> {
            if ((val += amount) < 0L) {
                val = Long.MAX_VALUE;
            }
            return val;
        });
        if (op > 0L) {
            this.schedule(0L);
        }
        return this;
    }

    @Override
    public long demand() {
        return this.demand.get();
    }

    private KafkaReadStreamImpl<K, V> startConsuming() {
        this.consuming.set(true);
        this.schedule(0L);
        return this;
    }

    @Override
    public KafkaReadStreamImpl<K, V> endHandler(Handler<Void> endHandler) {
        return this;
    }

    @Override
    public Future<Void> close() {
        ContextInternal ctx = (ContextInternal)this.context;
        if (this.closed.compareAndSet(false, true)) {
            this.consumer.wakeup();
            PromiseInternal promise = ctx.promise();
            this.worker.submit(() -> this.lambda$close$34((Promise)promise));
            return promise.future().onComplete(v -> this.worker.shutdownNow());
        }
        return ctx.succeededFuture();
    }

    @Override
    public Future<Long> position(TopicPartition partition) {
        return this.submitTask2((consumer, future) -> {
            long pos = this.consumer.position(partition);
            if (future != null) {
                future.complete((Object)pos);
            }
        });
    }

    @Override
    public Future<Map<TopicPartition, OffsetAndTimestamp>> offsetsForTimes(Map<TopicPartition, Long> topicPartitionTimestamps) {
        return this.submitTask2((consumer, future) -> {
            Map offsetsForTimes = this.consumer.offsetsForTimes(topicPartitionTimestamps);
            if (future != null) {
                future.complete((Object)offsetsForTimes);
            }
        });
    }

    @Override
    public Future<OffsetAndTimestamp> offsetsForTimes(TopicPartition topicPartition, long timestamp) {
        return this.submitTask2((consumer, future) -> {
            HashMap<TopicPartition, Long> input = new HashMap<TopicPartition, Long>();
            input.put(topicPartition, timestamp);
            Map offsetsForTimes = this.consumer.offsetsForTimes(input);
            if (future != null) {
                future.complete((Object)((OffsetAndTimestamp)offsetsForTimes.get(topicPartition)));
            }
        });
    }

    @Override
    public Future<Map<TopicPartition, Long>> beginningOffsets(Set<TopicPartition> topicPartitions) {
        return this.submitTask2((consumer, future) -> {
            Map beginningOffsets = this.consumer.beginningOffsets((Collection)topicPartitions);
            if (future != null) {
                future.complete((Object)beginningOffsets);
            }
        });
    }

    @Override
    public Future<Long> beginningOffsets(TopicPartition topicPartition) {
        return this.submitTask2((consumer, future) -> {
            HashSet<TopicPartition> input = new HashSet<TopicPartition>();
            input.add(topicPartition);
            Map beginningOffsets = this.consumer.beginningOffsets(input);
            if (future != null) {
                future.complete((Object)((Long)beginningOffsets.get(topicPartition)));
            }
        });
    }

    @Override
    public Future<Map<TopicPartition, Long>> endOffsets(Set<TopicPartition> topicPartitions) {
        return this.submitTask2((consumer, future) -> {
            Map endOffsets = this.consumer.endOffsets((Collection)topicPartitions);
            if (future != null) {
                future.complete((Object)endOffsets);
            }
        });
    }

    @Override
    public Future<Long> endOffsets(TopicPartition topicPartition) {
        return this.submitTask2((consumer, future) -> {
            HashSet<TopicPartition> input = new HashSet<TopicPartition>();
            input.add(topicPartition);
            Map endOffsets = this.consumer.endOffsets(input);
            if (future != null) {
                future.complete((Object)((Long)endOffsets.get(topicPartition)));
            }
        });
    }

    @Override
    public Consumer<K, V> unwrap() {
        return this.consumer;
    }

    @Override
    public KafkaReadStream batchHandler(Handler<ConsumerRecords<K, V>> handler) {
        this.batchHandler = handler;
        this.schedule(0L);
        return this;
    }

    @Override
    public KafkaReadStream<K, V> pollTimeout(Duration timeout) {
        this.pollTimeout = timeout;
        return this;
    }

    @Override
    public Future<ConsumerRecords<K, V>> poll(Duration timeout) {
        Promise promise = Promise.promise();
        if (this.worker == null) {
            promise.fail((Throwable)new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"));
            return promise.future();
        }
        this.worker.submit(() -> {
            if (!this.closed.get()) {
                try {
                    ConsumerRecords records = this.consumer.poll(timeout);
                    this.context.runOnContext(v -> promise.complete((Object)records));
                }
                catch (WakeupException ignore) {
                    this.context.runOnContext(v -> promise.complete((Object)ConsumerRecords.empty()));
                }
                catch (Exception e) {
                    this.context.runOnContext(v -> promise.fail((Throwable)e));
                }
            }
        });
        return promise.future();
    }

    private /* synthetic */ void lambda$close$34(Promise promise) {
        try {
            this.consumer.close();
            promise.complete();
        }
        catch (KafkaException ex) {
            promise.fail((Throwable)ex);
        }
    }
}

