/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.PulsarConsumerCoordinator;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarKafkaConsumer<K, V>
implements Consumer<K, V>,
MessageListener<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(PulsarKafkaConsumer.class);
    private static final long serialVersionUID = 1L;
    private final PulsarClient client;
    private final Schema<K> keySchema;
    private final Schema<V> valueSchema;
    private final String groupId;
    private final boolean isAutoCommit;
    private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer<byte[]>> consumers = new ConcurrentHashMap<TopicPartition, org.apache.pulsar.client.api.Consumer<byte[]>>();
    private final Map<TopicPartition, Long> lastReceivedOffset = new ConcurrentHashMap<TopicPartition, Long>();
    private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset = new ConcurrentHashMap<TopicPartition, OffsetAndMetadata>();
    private final Set<TopicPartition> unpolledPartitions = new HashSet<TopicPartition>();
    private final SubscriptionInitialPosition strategy;
    private List<ConsumerInterceptor<K, V>> interceptors;
    private volatile boolean closed = false;
    private final int maxRecordsInSinglePoll;
    private final Properties properties;
    private final ConsumerConfig consumerCfg;
    private final BlockingQueue<QueueItem> receivedMessages = new ArrayBlockingQueue<QueueItem>(1000);

    public PulsarKafkaConsumer(Map<String, Object> configs) {
        this(new ConsumerConfig(configs), null, null);
    }

    public PulsarKafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig(configs), new PulsarKafkaSchema<K>(keyDeserializer), new PulsarKafkaSchema<V>(valueDeserializer));
    }

    public PulsarKafkaConsumer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
        this(new ConsumerConfig(configs), keySchema, valueSchema);
    }

    public PulsarKafkaConsumer(Properties properties) {
        this(new ConsumerConfig(properties), null, null);
    }

    public PulsarKafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig(properties), new PulsarKafkaSchema<K>(keyDeserializer), new PulsarKafkaSchema<V>(valueDeserializer));
    }

    public PulsarKafkaConsumer(Properties properties, Schema<K> keySchema, Schema<V> valueSchema) {
        this(new ConsumerConfig(properties), keySchema, valueSchema);
    }

    private PulsarKafkaConsumer(ConsumerConfig consumerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
        this.consumerCfg = consumerConfig;
        if (keySchema == null) {
            Deserializer kafkaKeyDeserializer = (Deserializer)consumerConfig.getConfiguredInstance("key.deserializer", Deserializer.class);
            kafkaKeyDeserializer.configure(consumerConfig.originals(), true);
            this.keySchema = new PulsarKafkaSchema<K>(kafkaKeyDeserializer);
        } else {
            this.keySchema = keySchema;
            consumerConfig.ignore("key.deserializer");
        }
        if (valueSchema == null) {
            Deserializer kafkaValueDeserializer = (Deserializer)consumerConfig.getConfiguredInstance("value.deserializer", Deserializer.class);
            kafkaValueDeserializer.configure(consumerConfig.originals(), true);
            this.valueSchema = new PulsarKafkaSchema<V>(kafkaValueDeserializer);
        } else {
            this.valueSchema = valueSchema;
            consumerConfig.ignore("value.deserializer");
        }
        this.groupId = consumerConfig.getString("group.id") == null ? consumerConfig.getString("client.id") : consumerConfig.getString("group.id");
        Preconditions.checkNotNull((Object)this.groupId, (Object)"groupId cannot be null");
        this.isAutoCommit = consumerConfig.getBoolean("enable.auto.commit");
        this.strategy = this.getStrategy(consumerConfig.getString("auto.offset.reset"));
        log.info("Offset reset strategy has been assigned value {}", (Object)this.strategy);
        String serviceUrl = (String)consumerConfig.getList("bootstrap.servers").get(0);
        this.maxRecordsInSinglePoll = consumerConfig.values().containsKey("max.poll.records") ? consumerConfig.getInt("max.poll.records") : 1000;
        this.interceptors = consumerConfig.getConfiguredInstances("interceptor.classes", ConsumerInterceptor.class);
        this.properties = new Properties();
        log.info("config originals: {}", (Object)consumerConfig.originals());
        consumerConfig.originals().forEach((k, v) -> {
            log.info("Setting k = {} v = {}", k, v);
            if (k != null && v != null) {
                this.properties.put(k, v);
            }
        });
        ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(this.properties);
        clientBuilder.enableTcpNoDelay(false);
        try {
            this.client = clientBuilder.serviceUrl(serviceUrl).build();
        }
        catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    private SubscriptionInitialPosition getStrategy(String strategy) {
        switch (strategy) {
            case "earliest": {
                return SubscriptionInitialPosition.Earliest;
            }
        }
        return SubscriptionInitialPosition.Latest;
    }

    public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
        block2: {
            try {
                this.receivedMessages.put(new QueueItem(consumer, msg));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (this.closed) break block2;
                throw new RuntimeException(e);
            }
        }
    }

    public Set<TopicPartition> assignment() {
        return this.consumers.keySet();
    }

    public Set<String> subscription() {
        return this.consumers.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
    }

    public void subscribe(Collection<String> topics) {
        this.subscribe(topics, null);
    }

    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
        ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>();
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        try {
            for (String topic : topics) {
                int numberOfPartitions = (Integer)((PulsarClientImpl)this.client).getNumberOfPartitions(topic).get();
                ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(this.client, this.properties);
                consumerBuilder.subscriptionType(SubscriptionType.Failover);
                consumerBuilder.messageListener((MessageListener)this);
                consumerBuilder.subscriptionName(this.groupId);
                if (numberOfPartitions > 1) {
                    consumerBuilder.consumerName(ConsumerName.generateRandomName());
                    int i = 0;
                    while (i < numberOfPartitions) {
                        String partitionName = TopicName.get((String)topic).getPartition(i).toString();
                        CompletableFuture future = consumerBuilder.clone().topic(new String[]{partitionName}).subscribeAsync();
                        int partitionIndex = i++;
                        TopicPartition tp = this.normalizedTopicPartition(topic, partitionIndex);
                        futures.add(future.thenApply(consumer -> {
                            log.info("Add consumer {} for partition {}", consumer, (Object)tp);
                            this.consumers.putIfAbsent(tp, (org.apache.pulsar.client.api.Consumer<byte[]>)consumer);
                            return consumer;
                        }));
                        topicPartitions.add(tp);
                    }
                    continue;
                }
                CompletableFuture future = consumerBuilder.topic(new String[]{topic}).subscribeAsync();
                TopicPartition tp = this.normalizedTopicPartition(topic, 0);
                futures.add(future.thenApply(consumer -> {
                    log.info("Add consumer {} for partition {}", consumer, (Object)tp);
                    this.consumers.putIfAbsent(tp, (org.apache.pulsar.client.api.Consumer<byte[]>)consumer);
                    return consumer;
                }));
                topicPartitions.add(tp);
            }
            this.unpolledPartitions.addAll(topicPartitions);
            futures.forEach(CompletableFuture::join);
            PulsarConsumerCoordinator.invokePartitionsAssigned(this.groupId, this.consumerCfg, Lists.newArrayList(this.consumers.keySet()));
            if (callback != null) {
                callback.onPartitionsAssigned(topicPartitions);
            }
        }
        catch (Exception e) {
            futures.forEach(f -> {
                try {
                    ((org.apache.pulsar.client.api.Consumer)f.get()).close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
            throw new RuntimeException(e);
        }
    }

    private TopicPartition normalizedTopicPartition(TopicPartition tp) {
        return this.normalizedTopicPartition(tp.topic(), tp.partition());
    }

    private TopicPartition normalizedTopicPartition(String topic, int partition) {
        String name = TopicName.get((String)topic).getPartitionedTopicName();
        return new TopicPartition(name, partition);
    }

    public void assign(Collection<TopicPartition> partitions) {
        Set<String> topics = partitions.stream().map(p -> p.topic()).collect(Collectors.toSet());
        this.subscribe(topics);
    }

    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
        throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
    }

    public void subscribe(Pattern pattern) {
        throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
    }

    public void unsubscribe() {
        this.consumers.values().forEach(c -> {
            try {
                c.unsubscribe();
            }
            catch (PulsarClientException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public ConsumerRecords<K, V> poll(long timeoutMillis) {
        try {
            QueueItem item = this.receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS);
            if (item == null) {
                return ConsumerRecords.EMPTY;
            }
            HashMap<TopicPartition, List> records = new HashMap<TopicPartition, List>();
            int numberOfRecords = 0;
            while (item != null) {
                TopicName topicName = TopicName.get((String)item.consumer.getTopic());
                String topic = topicName.getPartitionedTopicName();
                int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
                Message<byte[]> msg = item.message;
                MessageIdImpl msgId = (MessageIdImpl)msg.getMessageId();
                long offset = MessageIdUtils.getOffset((MessageId)msgId);
                TopicPartition tp = new TopicPartition(topic, partition);
                if (this.lastReceivedOffset.get(tp) == null && !this.unpolledPartitions.contains(tp)) {
                    log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", (Object)tp);
                    this.resetOffsets(tp);
                }
                K key = this.getKey(topic, msg);
                if (this.valueSchema instanceof PulsarKafkaSchema) {
                    ((PulsarKafkaSchema)this.valueSchema).setTopic(topic);
                }
                Object value = this.valueSchema.decode(msg.getData());
                TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
                long timestamp = msg.getPublishTime();
                if (msg.getEventTime() > 0L) {
                    timestamp = msg.getEventTime();
                    timestampType = TimestampType.CREATE_TIME;
                }
                ConsumerRecord consumerRecord = new ConsumerRecord(topic, partition, offset, timestamp, timestampType, -1L, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, value);
                records.computeIfAbsent(tp, k -> new ArrayList()).add(consumerRecord);
                this.lastReceivedOffset.put(tp, offset);
                this.unpolledPartitions.remove(tp);
                if (++numberOfRecords >= this.maxRecordsInSinglePoll) break;
                item = this.receivedMessages.poll(0L, TimeUnit.MILLISECONDS);
            }
            if (this.isAutoCommit && !records.isEmpty()) {
                this.commitAsync();
            }
            return this.applyConsumerInterceptorsOnConsume(this.interceptors, new ConsumerRecords(records));
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public ConsumerRecords<K, V> poll(Duration duration) {
        return this.poll(duration.toMillis());
    }

    private K getKey(String topic, Message<byte[]> msg) {
        if (!msg.hasKey()) {
            return null;
        }
        if (this.keySchema instanceof PulsarKafkaSchema) {
            PulsarKafkaSchema pulsarKafkaSchema = (PulsarKafkaSchema)this.keySchema;
            Deserializer kafkaDeserializer = pulsarKafkaSchema.getKafkaDeserializer();
            if (kafkaDeserializer instanceof StringDeserializer) {
                return (K)msg.getKey();
            }
            pulsarKafkaSchema.setTopic(topic);
        }
        byte[] data = Base64.getDecoder().decode(msg.getKey());
        return (K)this.keySchema.decode(data);
    }

    public void commitSync() {
        try {
            this.doCommitOffsets(this.getCurrentOffsetsMap()).get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void commitSync(Duration duration) {
        this.commitSync();
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        try {
            this.doCommitOffsets(offsets).get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map, Duration duration) {
        this.commitSync(map);
    }

    public void commitAsync() {
        this.doCommitOffsets(this.getCurrentOffsetsMap());
    }

    public void commitAsync(OffsetCommitCallback callback) {
        Map<TopicPartition, OffsetAndMetadata> offsets = this.getCurrentOffsetsMap();
        this.doCommitOffsets(offsets).handle((v, throwable) -> {
            callback.onComplete(offsets, throwable != null ? new Exception((Throwable)throwable) : null);
            return null;
        });
    }

    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        this.doCommitOffsets(offsets).handle((v, throwable) -> {
            callback.onComplete(offsets, throwable != null ? new Exception((Throwable)throwable) : null);
            return null;
        });
    }

    private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
        Preconditions.checkNotNull(offsets);
        ArrayList futures = new ArrayList();
        this.applyConsumerInterceptorsOnCommit(this.interceptors, offsets);
        offsets.forEach((tp, offsetAndMetadata) -> {
            TopicPartition topicPartition = this.normalizedTopicPartition((TopicPartition)tp);
            org.apache.pulsar.client.api.Consumer consumer = (org.apache.pulsar.client.api.Consumer)this.consumers.get(topicPartition);
            this.lastCommittedOffset.put((TopicPartition)tp, (OffsetAndMetadata)offsetAndMetadata);
            futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId((long)offsetAndMetadata.offset())));
        });
        return FutureUtil.waitForAll(futures);
    }

    private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsetsMap() {
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        this.lastReceivedOffset.forEach((topicPartition, offset) -> {
            OffsetAndMetadata om = new OffsetAndMetadata(offset.longValue());
            offsets.put((TopicPartition)topicPartition, om);
        });
        return offsets;
    }

    private ConsumerRecords applyConsumerInterceptorsOnConsume(List<ConsumerInterceptor<K, V>> interceptors, ConsumerRecords consumerRecords) {
        ConsumerRecords processedConsumerRecords = consumerRecords;
        for (ConsumerInterceptor<K, V> interceptor : interceptors) {
            try {
                processedConsumerRecords = interceptor.onConsume(processedConsumerRecords);
            }
            catch (Exception e) {
                log.warn("Error executing onConsume for interceptor {}.", (Object)interceptor.getClass().getCanonicalName(), (Object)e);
            }
        }
        return processedConsumerRecords;
    }

    private void applyConsumerInterceptorsOnCommit(List<ConsumerInterceptor<K, V>> interceptors, Map<TopicPartition, OffsetAndMetadata> offsets) {
        for (ConsumerInterceptor<K, V> interceptor : interceptors) {
            try {
                interceptor.onCommit(offsets);
            }
            catch (Exception e) {
                log.warn("Error executing onCommit for interceptor {}.", (Object)interceptor.getClass().getCanonicalName(), (Object)e);
            }
        }
    }

    public void seek(TopicPartition partition, long offset) {
        MessageId msgId = MessageIdUtils.getMessageId((long)offset);
        TopicPartition topicPartition = this.normalizedTopicPartition(partition);
        org.apache.pulsar.client.api.Consumer c = (org.apache.pulsar.client.api.Consumer)this.consumers.get(topicPartition);
        if (c == null) {
            throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
        }
        try {
            c.seek(msgId);
        }
        catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    public void seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        this.seek(topicPartition, offsetAndMetadata.offset());
    }

    public void seekToBeginning(Collection<TopicPartition> partitions) {
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        if (partitions.isEmpty()) {
            partitions = this.consumers.keySet();
        }
        this.lastCommittedOffset.clear();
        this.lastReceivedOffset.clear();
        for (TopicPartition tp : partitions) {
            TopicPartition normalizedTp = this.normalizedTopicPartition(tp);
            org.apache.pulsar.client.api.Consumer c = (org.apache.pulsar.client.api.Consumer)this.consumers.get(normalizedTp);
            if (c == null) {
                futures.add(FutureUtil.failedFuture((Throwable)new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
                continue;
            }
            futures.add(c.seekAsync(MessageId.earliest));
            this.unpolledPartitions.add(tp);
        }
        FutureUtil.waitForAll(futures).join();
    }

    public void seekToEnd(Collection<TopicPartition> partitions) {
        ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
        if (partitions.isEmpty()) {
            partitions = this.consumers.keySet();
        }
        this.lastCommittedOffset.clear();
        this.lastReceivedOffset.clear();
        for (TopicPartition tp : partitions) {
            TopicPartition normalizedTp = this.normalizedTopicPartition(tp);
            org.apache.pulsar.client.api.Consumer c = (org.apache.pulsar.client.api.Consumer)this.consumers.get(normalizedTp);
            if (c == null) {
                futures.add(FutureUtil.failedFuture((Throwable)new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
                continue;
            }
            futures.add(c.seekAsync(MessageId.latest));
            this.unpolledPartitions.add(tp);
        }
        FutureUtil.waitForAll(futures).join();
    }

    public long position(TopicPartition partition) {
        Long offset = this.lastReceivedOffset.get(partition);
        if (offset == null && !this.unpolledPartitions.contains(partition)) {
            return this.resetOffsets(partition).getValue();
        }
        return this.unpolledPartitions.contains(partition) ? 0L : offset;
    }

    public long position(TopicPartition topicPartition, Duration duration) {
        return this.position(topicPartition);
    }

    private SubscriptionInitialPosition resetOffsets(TopicPartition partition) {
        log.info("Resetting partition {} and seeking to {} position", (Object)partition, (Object)this.strategy);
        if (this.strategy == SubscriptionInitialPosition.Earliest) {
            this.seekToBeginning(Collections.singleton(partition));
        } else {
            this.seekToEnd(Collections.singleton(partition));
        }
        return this.strategy;
    }

    public OffsetAndMetadata committed(TopicPartition partition) {
        return this.lastCommittedOffset.get(partition);
    }

    public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) {
        return this.committed(topicPartition);
    }

    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
        return partitions.stream().map(x -> new AbstractMap.SimpleEntry<TopicPartition, OffsetAndMetadata>((TopicPartition)x, this.committed((TopicPartition)x))).filter(entry -> entry.getValue() != null).collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
    }

    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, Duration duration) {
        return this.committed(partitions);
    }

    public Map<MetricName, ? extends Metric> metrics() {
        throw new UnsupportedOperationException();
    }

    public List<PartitionInfo> partitionsFor(String topic) {
        throw new UnsupportedOperationException();
    }

    public List<PartitionInfo> partitionsFor(String s, Duration duration) {
        throw new UnsupportedOperationException();
    }

    public Map<String, List<PartitionInfo>> listTopics() {
        throw new UnsupportedOperationException();
    }

    public Map<String, List<PartitionInfo>> listTopics(Duration duration) {
        throw new UnsupportedOperationException();
    }

    public Set<TopicPartition> paused() {
        return this.consumers.keySet();
    }

    public void pause(Collection<TopicPartition> partitions) {
        partitions.forEach(p -> {
            TopicPartition topicPartition = this.normalizedTopicPartition((TopicPartition)p);
            org.apache.pulsar.client.api.Consumer c = (org.apache.pulsar.client.api.Consumer)this.consumers.get(topicPartition);
            if (c != null) {
                c.pause();
            }
        });
    }

    public void resume(Collection<TopicPartition> partitions) {
        partitions.forEach(p -> {
            TopicPartition topicPartition = this.normalizedTopicPartition((TopicPartition)p);
            org.apache.pulsar.client.api.Consumer c = (org.apache.pulsar.client.api.Consumer)this.consumers.get(topicPartition);
            if (c != null) {
                c.resume();
            }
        });
    }

    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        throw new UnsupportedOperationException();
    }

    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map, Duration duration) {
        throw new UnsupportedOperationException();
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
        throw new UnsupportedOperationException();
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, Duration duration) {
        throw new UnsupportedOperationException();
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
        throw new UnsupportedOperationException();
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, Duration duration) {
        throw new UnsupportedOperationException();
    }

    public ConsumerGroupMetadata groupMetadata() {
        throw new UnsupportedOperationException();
    }

    public void enforceRebalance() {
        log.info("enforceRebalance() is called but ignored");
    }

    public void close() {
        this.close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    public void close(long timeout, TimeUnit unit) {
        try {
            this.closed = true;
            if (this.isAutoCommit) {
                this.commitAsync();
            }
            this.client.closeAsync().get(timeout, unit);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    public void close(Duration duration) {
        this.close(duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    public void wakeup() {
        throw new UnsupportedOperationException();
    }

    private static class QueueItem {
        final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
        final Message<byte[]> message;

        QueueItem(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> message) {
            this.consumer = consumer;
            this.message = message;
        }
    }
}

