/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.kafka.client.consumer.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

public class KafkaReadStreamImpl<K, V>
implements KafkaReadStream<K, V> {
    private static final AtomicInteger threadCount = new AtomicInteger(0);
    private final Context context;
    private final AtomicBoolean closed = new AtomicBoolean(true);
    private final Consumer<K, V> consumer;
    private final AtomicBoolean consuming = new AtomicBoolean(false);
    private final AtomicLong demand = new AtomicLong(Long.MAX_VALUE);
    private final AtomicBoolean polling = new AtomicBoolean(false);
    private Handler<ConsumerRecord<K, V>> recordHandler;
    private Handler<Throwable> exceptionHandler;
    private Iterator<ConsumerRecord<K, V>> current;
    private Handler<ConsumerRecords<K, V>> batchHandler;
    private Handler<Set<TopicPartition>> partitionsRevokedHandler;
    private Handler<Set<TopicPartition>> partitionsAssignedHandler;
    private long pollTimeout = 1000L;
    private ExecutorService worker;
    private final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener(){

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            Handler handler = KafkaReadStreamImpl.this.partitionsRevokedHandler;
            if (handler != null) {
                KafkaReadStreamImpl.this.context.runOnContext(v -> handler.handle(Helper.toSet(partitions)));
            }
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            Handler handler = KafkaReadStreamImpl.this.partitionsAssignedHandler;
            if (handler != null) {
                KafkaReadStreamImpl.this.context.runOnContext(v -> handler.handle(Helper.toSet(partitions)));
            }
        }
    };

    public KafkaReadStreamImpl(Context context, Consumer<K, V> consumer) {
        this.context = context;
        this.consumer = consumer;
    }

    private <T> void start(BiConsumer<Consumer<K, V>, Promise<T>> task, Handler<AsyncResult<T>> handler) {
        this.worker = Executors.newSingleThreadExecutor(r -> new Thread(r, "vert.x-kafka-consumer-thread-" + threadCount.getAndIncrement()));
        this.submitTaskWhenStarted(task, handler);
    }

    private <T> void submitTaskWhenStarted(BiConsumer<Consumer<K, V>, Promise<T>> task, Handler<AsyncResult<T>> handler) {
        if (this.worker == null) {
            throw new IllegalStateException();
        }
        this.worker.submit(() -> {
            block4: {
                Promise future = null;
                if (handler != null) {
                    future = Promise.promise();
                    future.future().setHandler(event -> this.context.runOnContext(v -> handler.handle(event)));
                }
                try {
                    task.accept(this.consumer, future);
                }
                catch (Exception e) {
                    if (future != null) {
                        future.tryFail((Throwable)e);
                    }
                    if (this.exceptionHandler == null) break block4;
                    this.exceptionHandler.handle((Object)e);
                }
            }
        });
    }

    private void pollRecords(Handler<ConsumerRecords<K, V>> handler) {
        if (this.polling.compareAndSet(false, true)) {
            this.worker.submit(() -> {
                block9: {
                    boolean submitted = false;
                    try {
                        if (this.closed.get()) break block9;
                        try {
                            ConsumerRecords records = this.consumer.poll(this.pollTimeout);
                            if (records != null && records.count() > 0) {
                                submitted = true;
                                this.context.runOnContext(v -> {
                                    this.polling.set(false);
                                    handler.handle((Object)records);
                                });
                            }
                        }
                        catch (WakeupException records) {
                        }
                        catch (Exception e) {
                            if (this.exceptionHandler != null) {
                                this.exceptionHandler.handle((Object)e);
                            }
                        }
                    }
                    finally {
                        if (!submitted) {
                            this.context.runOnContext(v -> {
                                this.polling.set(false);
                                this.schedule(0L);
                            });
                        }
                    }
                }
            });
        }
    }

    private void schedule(long delay) {
        Handler<ConsumerRecord<K, V>> handler = this.recordHandler;
        if (this.consuming.get() && this.demand.get() > 0L && handler != null) {
            this.context.runOnContext(v1 -> {
                if (delay > 0L) {
                    this.context.owner().setTimer(delay, v2 -> this.run(handler));
                } else {
                    this.run(handler);
                }
            });
        }
    }

    private void run(Handler<ConsumerRecord<K, V>> handler) {
        if (this.closed.get()) {
            return;
        }
        if (this.current == null || !this.current.hasNext()) {
            this.pollRecords(records -> {
                if (records != null && records.count() > 0) {
                    this.current = records.iterator();
                    if (this.batchHandler != null) {
                        this.batchHandler.handle(records);
                    }
                    this.schedule(0L);
                } else {
                    this.schedule(1L);
                }
            });
        } else {
            int count = 0;
            block0: while (this.current.hasNext() && count++ < 10) {
                long v;
                while ((v = this.demand.get()) > 0L) {
                    if (v != Long.MAX_VALUE && !this.demand.compareAndSet(v, v - 1L)) continue;
                    ConsumerRecord<K, V> next = this.current.next();
                    handler.handle(next);
                    continue block0;
                }
                break block0;
            }
            this.schedule(0L);
        }
    }

    protected <T> void submitTask(BiConsumer<Consumer<K, V>, Promise<T>> task, Handler<AsyncResult<T>> handler) {
        if (this.closed.compareAndSet(true, false)) {
            this.start(task, handler);
        } else {
            this.submitTaskWhenStarted(task, handler);
        }
    }

    @Override
    public Future<Void> pause(Set<TopicPartition> topicPartitions) {
        Promise promise = Promise.promise();
        this.pause(topicPartitions, (Handler<AsyncResult<Void>>)promise);
        return promise.future();
    }

    @Override
    public KafkaReadStream<K, V> pause(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Void>> completionHandler) {
        this.submitTask((consumer, future) -> {
            consumer.pause((Collection)topicPartitions);
            if (future != null) {
                future.complete();
            }
        }, completionHandler);
        return this;
    }

    @Override
    public void paused(Handler<AsyncResult<Set<TopicPartition>>> handler) {
        this.submitTask((consumer, future) -> {
            Set result = consumer.paused();
            if (future != null) {
                future.complete((Object)result);
            }
        }, handler);
    }

    @Override
    public Future<Set<TopicPartition>> paused() {
        Promise promise = Promise.promise();
        this.paused((Handler<AsyncResult<Set<TopicPartition>>>)promise);
        return promise.future();
    }

    @Override
    public Future<Void> resume(Set<TopicPartition> topicPartitions) {
        Promise promise = Promise.promise();
        this.resume(topicPartitions, (Handler<AsyncResult<Void>>)promise);
        return promise.future();
    }

    @Override
    public KafkaReadStream<K, V> resume(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Void>> completionHandler) {
        this.submitTask((consumer, future) -> {
            consumer.resume((Collection)topicPartitions);
            if (future != null) {
                future.complete();
            }
        }, completionHandler);
        return this;
    }

    @Override
    public void committed(TopicPartition topicPartition, Handler<AsyncResult<OffsetAndMetadata>> handler) {
        this.submitTask((consumer, future) -> {
            OffsetAndMetadata result = consumer.committed(topicPartition);
            if (future != null) {
                future.complete((Object)result);
            }
        }, handler);
    }

    @Override
    public Future<OffsetAndMetadata> committed(TopicPartition topicPartition) {
        Promise promise = Promise.promise();
        this.committed(topicPartition, (Handler<AsyncResult<OffsetAndMetadata>>)promise);
        return promise.future();
    }

    @Override
    public Future<Void> seekToEnd(Set<TopicPartition> topicPartitions) {
        Promise promise = Promise.promise();
        this.seekToEnd(topicPartitions, (Handler<AsyncResult<Void>>)promise);
        return promise.future();
    }

    @Override
    public KafkaReadStream<K, V> seekToEnd(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Void>> completionHandler) {
        this.context.runOnContext(r -> {
            this.current = null;
            this.submitTask((consumer, future) -> {
                consumer.seekToEnd((Collection)topicPartitions);
                if (future != null) {
                    future.complete();
                }
            }, completionHandler);
        });
        return this;
    }

    @Override
    public Future<Void> seekToBeginning(Set<TopicPartition> topicPartitions) {
        Promise promise = Promise.promise();
        this.seekToBeginning(topicPartitions, (Handler<AsyncResult<Void>>)promise);
        return promise.future();
    }

    @Override
    public KafkaReadStream<K, V> seekToBeginning(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Void>> completionHandler) {
        this.context.runOnContext(r -> {
            this.current = null;
            this.submitTask((consumer, future) -> {
                consumer.seekToBeginning((Collection)topicPartitions);
                if (future != null) {
                    future.complete();
                }
            }, completionHandler);
        });
        return this;
    }

    @Override
    public Future<Void> seek(TopicPartition topicPartition, long offset) {
        Promise promise = Promise.promise();
        this.seek(topicPartition, offset, (Handler<AsyncResult<Void>>)promise);
        return promise.future();
    }

    @Override
    public KafkaReadStream<K, V> seek(TopicPartition topicPartition, long offset, Handler<AsyncResult<Void>> completionHandler) {
        this.context.runOnContext(r -> {
            this.current = null;
            this.submitTask((consumer, future) -> {
                consumer.seek(topicPartition, offset);
                if (future != null) {
                    future.complete();
                }
            }, completionHandler);
        });
        return this;
    }

    @Override
    public KafkaReadStream<K, V> partitionsRevokedHandler(Handler<Set<TopicPartition>> handler) {
        this.partitionsRevokedHandler = handler;
        return this;
    }

    @Override
    public KafkaReadStream<K, V> partitionsAssignedHandler(Handler<Set<TopicPartition>> handler) {
        this.partitionsAssignedHandler = handler;
        return this;
    }

    @Override
    public Future<Void> subscribe(Set<String> topics) {
        Promise promise = Promise.promise();
        this.subscribe(topics, (Handler<AsyncResult<Void>>)promise);
        return promise.future();
    }

    @Override
    public KafkaReadStream<K, V> subscribe(Set<String> topics, Handler<AsyncResult<Void>> completionHandler) {
        BiConsumer handler = (consumer, future) -> {
            consumer.subscribe((Collection)topics, this.rebalanceListener);
            this.startConsuming();
            if (future != null) {
                future.complete();
            }
        };
        if (this.closed.compareAndSet(true, false)) {
            this.start(handler, completionHandler);
        } else {
            this.submitTask(handler, completionHandler);
        }
        return this;
    }

    @Override
    public KafkaReadStream<K, V> subscribe(Pattern pattern, Handler<AsyncResult<Void>> completionHandler) {
        BiConsumer handler = (consumer, future) -> {
            consumer.subscribe(pattern, this.rebalanceListener);
            this.startConsuming();
            if (future != null) {
                future.complete();
            }
        };
        if (this.closed.compareAndSet(true, false)) {
            this.start(handler, completionHandler);
        } else {
            this.submitTask(handler, completionHandler);
        }
        return this;
    }

    @Override
    public Future<Void> subscribe(Pattern pattern) {
        Promise promise = Promise.promise();
        this.subscribe(pattern, (Handler<AsyncResult<Void>>)promise);
        return promise.future();
    }

    @Override
    public Future<Void> unsubscribe() {
        Promise promise = Promise.promise();
        this.unsubscribe((Handler<AsyncResult<Void>>)promise);
        return promise.future();
    }

    @Override
    public KafkaReadStream<K, V> unsubscribe(Handler<AsyncResult<Void>> completionHandler) {
        this.submitTask((consumer, future) -> {
            consumer.unsubscribe();
            if (future != null) {
                future.complete();
            }
        }, completionHandler);
        return this;
    }

    @Override
    public KafkaReadStream<K, V> subscription(Handler<AsyncResult<Set<String>>> handler) {
        this.submitTask((consumer, future) -> {
            Set subscription = consumer.subscription();
            if (future != null) {
                future.complete((Object)subscription);
            }
        }, handler);
        return this;
    }

    @Override
    public Future<Set<String>> subscription() {
        Promise promise = Promise.promise();
        this.subscription((Handler<AsyncResult<Set<String>>>)promise);
        return promise.future();
    }

    @Override
    public Future<Void> assign(Set<TopicPartition> partitions) {
        Promise promise = Promise.promise();
        this.assign(partitions, (Handler<AsyncResult<Void>>)promise);
        return promise.future();
    }

    @Override
    public KafkaReadStream<K, V> assign(Set<TopicPartition> partitions, Handler<AsyncResult<Void>> completionHandler) {
        BiConsumer handler = (consumer, future) -> {
            consumer.assign((Collection)partitions);
            this.startConsuming();
            if (future != null) {
                future.complete();
            }
        };
        if (this.closed.compareAndSet(true, false)) {
            this.start(handler, completionHandler);
        } else {
            this.submitTask(handler, completionHandler);
        }
        return this;
    }

    @Override
    public KafkaReadStream<K, V> assignment(Handler<AsyncResult<Set<TopicPartition>>> handler) {
        this.submitTask((consumer, future) -> {
            Set partitions = consumer.assignment();
            if (future != null) {
                future.complete((Object)partitions);
            }
        }, handler);
        return this;
    }

    @Override
    public Future<Set<TopicPartition>> assignment() {
        Promise promise = Promise.promise();
        this.assignment((Handler<AsyncResult<Set<TopicPartition>>>)promise);
        return promise.future();
    }

    @Override
    public KafkaReadStream<K, V> listTopics(Handler<AsyncResult<Map<String, List<PartitionInfo>>>> handler) {
        this.submitTask((consumer, future) -> {
            Map topics = consumer.listTopics();
            if (future != null) {
                future.complete((Object)topics);
            }
        }, handler);
        return this;
    }

    @Override
    public Future<Map<String, List<PartitionInfo>>> listTopics() {
        Promise promise = Promise.promise();
        this.listTopics((Handler<AsyncResult<Map<String, List<PartitionInfo>>>>)promise);
        return promise.future();
    }

    @Override
    public Future<Map<TopicPartition, OffsetAndMetadata>> commit() {
        Promise promise = Promise.promise();
        this.commit((Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>>)promise);
        return promise.future();
    }

    @Override
    public void commit(Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>> completionHandler) {
        this.commit(null, completionHandler);
    }

    @Override
    public Future<Map<TopicPartition, OffsetAndMetadata>> commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        Promise promise = Promise.promise();
        this.commit(offsets, (Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>>)promise);
        return promise.future();
    }

    @Override
    public void commit(Map<TopicPartition, OffsetAndMetadata> offsets, Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>> completionHandler) {
        this.submitTask((consumer, future) -> {
            if (offsets == null) {
                consumer.commitSync();
            } else {
                consumer.commitSync(offsets);
            }
            if (future != null) {
                future.complete((Object)offsets);
            }
        }, completionHandler);
    }

    @Override
    public KafkaReadStreamImpl<K, V> partitionsFor(String topic, Handler<AsyncResult<List<PartitionInfo>>> handler) {
        this.submitTask((consumer, future) -> {
            List partitions = consumer.partitionsFor(topic);
            if (future != null) {
                future.complete((Object)partitions);
            }
        }, handler);
        return this;
    }

    @Override
    public Future<List<PartitionInfo>> partitionsFor(String topic) {
        Promise promise = Promise.promise();
        this.partitionsFor(topic, (Handler)promise);
        return promise.future();
    }

    @Override
    public KafkaReadStreamImpl<K, V> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override
    public KafkaReadStreamImpl<K, V> handler(Handler<ConsumerRecord<K, V>> handler) {
        this.recordHandler = handler;
        this.schedule(0L);
        return this;
    }

    @Override
    public KafkaReadStreamImpl<K, V> pause() {
        this.demand.set(0L);
        return this;
    }

    @Override
    public KafkaReadStreamImpl<K, V> resume() {
        return this.fetch(Long.MAX_VALUE);
    }

    @Override
    public KafkaReadStreamImpl<K, V> fetch(long amount) {
        if (amount < 0L) {
            throw new IllegalArgumentException("Invalid claim " + amount);
        }
        long op = this.demand.updateAndGet(val -> {
            if ((val += amount) < 0L) {
                val = Long.MAX_VALUE;
            }
            return val;
        });
        if (op > 0L) {
            this.schedule(0L);
        }
        return this;
    }

    private KafkaReadStreamImpl<K, V> startConsuming() {
        this.consuming.set(true);
        this.schedule(0L);
        return this;
    }

    @Override
    public KafkaReadStreamImpl<K, V> endHandler(Handler<Void> endHandler) {
        return this;
    }

    @Override
    public Future<Void> close() {
        Promise promise = Promise.promise();
        this.close((Handler<AsyncResult<Void>>)promise);
        return promise.future();
    }

    @Override
    public void close(Handler<AsyncResult<Void>> completionHandler) {
        if (this.closed.compareAndSet(false, true)) {
            this.worker.submit(() -> {
                this.consumer.close();
                this.context.runOnContext(v -> {
                    this.worker.shutdownNow();
                    if (completionHandler != null) {
                        completionHandler.handle((Object)Future.succeededFuture());
                    }
                });
            });
            this.consumer.wakeup();
        } else if (completionHandler != null) {
            completionHandler.handle((Object)Future.succeededFuture());
        }
    }

    @Override
    public void position(TopicPartition partition, Handler<AsyncResult<Long>> handler) {
        this.submitTask((consumer, future) -> {
            long pos = this.consumer.position(partition);
            if (future != null) {
                future.complete((Object)pos);
            }
        }, handler);
    }

    @Override
    public Future<Long> position(TopicPartition partition) {
        Promise promise = Promise.promise();
        this.position(partition, (Handler<AsyncResult<Long>>)promise);
        return promise.future();
    }

    @Override
    public void offsetsForTimes(Map<TopicPartition, Long> topicPartitionTimestamps, Handler<AsyncResult<Map<TopicPartition, OffsetAndTimestamp>>> handler) {
        this.submitTask((consumer, future) -> {
            Map offsetsForTimes = this.consumer.offsetsForTimes(topicPartitionTimestamps);
            if (future != null) {
                future.complete((Object)offsetsForTimes);
            }
        }, handler);
    }

    @Override
    public Future<Map<TopicPartition, OffsetAndTimestamp>> offsetsForTimes(Map<TopicPartition, Long> topicPartitionTimestamps) {
        Promise promise = Promise.promise();
        this.offsetsForTimes(topicPartitionTimestamps, (Handler<AsyncResult<Map<TopicPartition, OffsetAndTimestamp>>>)promise);
        return promise.future();
    }

    @Override
    public void offsetsForTimes(TopicPartition topicPartition, long timestamp, Handler<AsyncResult<OffsetAndTimestamp>> handler) {
        this.submitTask((consumer, future) -> {
            HashMap<TopicPartition, Long> input = new HashMap<TopicPartition, Long>();
            input.put(topicPartition, timestamp);
            Map offsetsForTimes = this.consumer.offsetsForTimes(input);
            if (future != null) {
                future.complete(offsetsForTimes.get(topicPartition));
            }
        }, handler);
    }

    @Override
    public Future<OffsetAndTimestamp> offsetsForTimes(TopicPartition topicPartition, long timestamp) {
        Promise promise = Promise.promise();
        this.offsetsForTimes(topicPartition, timestamp, (Handler<AsyncResult<OffsetAndTimestamp>>)promise);
        return promise.future();
    }

    @Override
    public void beginningOffsets(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Map<TopicPartition, Long>>> handler) {
        this.submitTask((consumer, future) -> {
            Map beginningOffsets = this.consumer.beginningOffsets((Collection)topicPartitions);
            if (future != null) {
                future.complete((Object)beginningOffsets);
            }
        }, handler);
    }

    @Override
    public Future<Map<TopicPartition, Long>> beginningOffsets(Set<TopicPartition> topicPartitions) {
        Promise promise = Promise.promise();
        this.beginningOffsets(topicPartitions, (Handler<AsyncResult<Map<TopicPartition, Long>>>)promise);
        return promise.future();
    }

    @Override
    public void beginningOffsets(TopicPartition topicPartition, Handler<AsyncResult<Long>> handler) {
        this.submitTask((consumer, future) -> {
            HashSet<TopicPartition> input = new HashSet<TopicPartition>();
            input.add(topicPartition);
            Map beginningOffsets = this.consumer.beginningOffsets(input);
            if (future != null) {
                future.complete(beginningOffsets.get(topicPartition));
            }
        }, handler);
    }

    @Override
    public Future<Long> beginningOffsets(TopicPartition topicPartition) {
        Promise promise = Promise.promise();
        this.beginningOffsets(topicPartition, (Handler<AsyncResult<Long>>)promise);
        return promise.future();
    }

    @Override
    public void endOffsets(Set<TopicPartition> topicPartitions, Handler<AsyncResult<Map<TopicPartition, Long>>> handler) {
        this.submitTask((consumer, future) -> {
            Map endOffsets = this.consumer.endOffsets((Collection)topicPartitions);
            if (future != null) {
                future.complete((Object)endOffsets);
            }
        }, handler);
    }

    @Override
    public Future<Map<TopicPartition, Long>> endOffsets(Set<TopicPartition> topicPartitions) {
        Promise promise = Promise.promise();
        this.endOffsets(topicPartitions, (Handler<AsyncResult<Map<TopicPartition, Long>>>)promise);
        return promise.future();
    }

    @Override
    public void endOffsets(TopicPartition topicPartition, Handler<AsyncResult<Long>> handler) {
        this.submitTask((consumer, future) -> {
            HashSet<TopicPartition> input = new HashSet<TopicPartition>();
            input.add(topicPartition);
            Map endOffsets = this.consumer.endOffsets(input);
            if (future != null) {
                future.complete(endOffsets.get(topicPartition));
            }
        }, handler);
    }

    @Override
    public Future<Long> endOffsets(TopicPartition topicPartition) {
        Promise promise = Promise.promise();
        this.endOffsets(topicPartition, (Handler<AsyncResult<Long>>)promise);
        return promise.future();
    }

    @Override
    public Consumer<K, V> unwrap() {
        return this.consumer;
    }

    @Override
    public KafkaReadStream batchHandler(Handler<ConsumerRecords<K, V>> handler) {
        this.batchHandler = handler;
        return this;
    }

    @Override
    public KafkaReadStream<K, V> pollTimeout(long timeout) {
        this.pollTimeout = timeout;
        return this;
    }

    @Override
    public void poll(long timeout, Handler<AsyncResult<ConsumerRecords<K, V>>> handler) {
        this.worker.submit(() -> {
            if (!this.closed.get()) {
                try {
                    ConsumerRecords records = this.consumer.poll(timeout);
                    this.context.runOnContext(v -> handler.handle((Object)Future.succeededFuture((Object)records)));
                }
                catch (WakeupException ignore) {
                    this.context.runOnContext(v -> handler.handle((Object)Future.succeededFuture((Object)ConsumerRecords.empty())));
                }
                catch (Exception e) {
                    this.context.runOnContext(v -> handler.handle((Object)Future.failedFuture((Throwable)e)));
                }
            }
        });
    }

    @Override
    public Future<ConsumerRecords<K, V>> poll(long timeout) {
        Promise promise = Promise.promise();
        this.poll(timeout, (Handler<AsyncResult<ConsumerRecords<K, V>>>)promise);
        return promise.future();
    }
}

