/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.RequestFutureAdapter;
import org.apache.kafka.clients.consumer.internals.RequestFutureListener;
import org.apache.kafka.clients.consumer.internals.StaleMetadataException;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Fetcher<K, V>
implements SubscriptionState.Listener {
    private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
    private final ConsumerNetworkClient client;
    private final Time time;
    private final int minBytes;
    private final int maxBytes;
    private final int maxWaitMs;
    private final int fetchSize;
    private final long retryBackoffMs;
    private final int maxPollRecords;
    private final boolean checkCrcs;
    private final Metadata metadata;
    private final FetchManagerMetrics sensors;
    private final SubscriptionState subscriptions;
    private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private PartitionRecords<K, V> nextInLineRecords = null;
    private ExceptionMetadata nextInLineExceptionMetadata = null;

    public Fetcher(ConsumerNetworkClient client, int minBytes, int maxBytes, int maxWaitMs, int fetchSize, int maxPollRecords, boolean checkCrcs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Metadata metadata, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs) {
        this.time = time;
        this.client = client;
        this.metadata = metadata;
        this.subscriptions = subscriptions;
        this.minBytes = minBytes;
        this.maxBytes = maxBytes;
        this.maxWaitMs = maxWaitMs;
        this.fetchSize = fetchSize;
        this.maxPollRecords = maxPollRecords;
        this.checkCrcs = checkCrcs;
        this.keyDeserializer = keyDeserializer;
        this.valueDeserializer = valueDeserializer;
        this.completedFetches = new ConcurrentLinkedQueue();
        this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
        this.retryBackoffMs = retryBackoffMs;
        subscriptions.addListener(this);
    }

    public boolean hasCompletedFetches() {
        return !this.completedFetches.isEmpty();
    }

    private boolean matchesRequestedPartitions(FetchRequest.Builder request, FetchResponse response) {
        Set<TopicPartition> requestedPartitions = request.fetchData().keySet();
        Set<TopicPartition> fetchedPartitions = response.responseData().keySet();
        return fetchedPartitions.equals(requestedPartitions);
    }

    public int sendFetches() {
        Map<Node, FetchRequest.Builder> fetchRequestMap = this.createFetchRequests();
        for (Map.Entry<Node, FetchRequest.Builder> fetchEntry : fetchRequestMap.entrySet()) {
            final FetchRequest.Builder request = fetchEntry.getValue();
            final Node fetchTarget = fetchEntry.getKey();
            log.debug("Sending fetch for partitions {} to broker {}", request.fetchData().keySet(), (Object)fetchTarget);
            this.client.send(fetchTarget, request).addListener(new RequestFutureListener<ClientResponse>(){

                @Override
                public void onSuccess(ClientResponse resp) {
                    FetchResponse response = (FetchResponse)resp.responseBody();
                    if (!Fetcher.this.matchesRequestedPartitions(request, response)) {
                        log.warn("Ignoring fetch response containing partitions {} since it does not match the requested partitions {}", response.responseData().keySet(), request.fetchData().keySet());
                        return;
                    }
                    HashSet<TopicPartition> partitions = new HashSet<TopicPartition>(response.responseData().keySet());
                    FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(Fetcher.this.sensors, partitions);
                    for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                        TopicPartition partition = entry.getKey();
                        long fetchOffset = request.fetchData().get((Object)partition).offset;
                        FetchResponse.PartitionData fetchData = entry.getValue();
                        Fetcher.this.completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator, request.version()));
                    }
                    Fetcher.this.sensors.fetchLatency.record(resp.requestLatencyMs());
                    Fetcher.this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
                }

                @Override
                public void onFailure(RuntimeException e) {
                    log.debug("Fetch request to {} for partitions {} failed", new Object[]{fetchTarget, request.fetchData().keySet(), e});
                }
            });
        }
        return fetchRequestMap.size();
    }

    public void resetOffsetsIfNeeded(Set<TopicPartition> partitions) {
        for (TopicPartition tp : partitions) {
            if (!this.subscriptions.isAssigned(tp) || !this.subscriptions.isOffsetResetNeeded(tp)) continue;
            this.resetOffset(tp);
        }
    }

    public void updateFetchPositions(Set<TopicPartition> partitions) {
        for (TopicPartition tp : partitions) {
            if (!this.subscriptions.isAssigned(tp) || this.subscriptions.hasValidPosition(tp)) continue;
            if (this.subscriptions.isOffsetResetNeeded(tp)) {
                this.resetOffset(tp);
                continue;
            }
            if (this.subscriptions.committed(tp) == null) {
                this.subscriptions.needOffsetReset(tp);
                this.resetOffset(tp);
                continue;
            }
            long committed = this.subscriptions.committed(tp).offset();
            log.debug("Resetting offset for partition {} to the committed offset {}", (Object)tp, (Object)committed);
            this.subscriptions.seek(tp, committed);
        }
    }

    public Map<String, List<PartitionInfo>> getAllTopicMetadata(long timeout) {
        return this.getTopicMetadata(MetadataRequest.Builder.allTopics(), timeout);
    }

    public Map<String, List<PartitionInfo>> getTopicMetadata(MetadataRequest.Builder request, long timeout) {
        if (!request.isAllTopics() && request.topics().isEmpty()) {
            return Collections.emptyMap();
        }
        long start = this.time.milliseconds();
        long remaining = timeout;
        do {
            long elapsed;
            RequestFuture<ClientResponse> future = this.sendMetadataRequest(request);
            this.client.poll(future, remaining);
            if (future.failed() && !future.isRetriable()) {
                throw future.exception();
            }
            if (future.succeeded()) {
                MetadataResponse response = (MetadataResponse)future.value().responseBody();
                Cluster cluster = response.cluster();
                Set<String> unauthorizedTopics = cluster.unauthorizedTopics();
                if (!unauthorizedTopics.isEmpty()) {
                    throw new TopicAuthorizationException(unauthorizedTopics);
                }
                boolean shouldRetry = false;
                Map<String, Errors> errors = response.errors();
                if (!errors.isEmpty()) {
                    log.debug("Topic metadata fetch included errors: {}", errors);
                    for (Map.Entry<String, Errors> errorEntry : errors.entrySet()) {
                        String topic = errorEntry.getKey();
                        Errors error = errorEntry.getValue();
                        if (error == Errors.INVALID_TOPIC_EXCEPTION) {
                            throw new InvalidTopicException("Topic '" + topic + "' is invalid");
                        }
                        if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) continue;
                        if (error.exception() instanceof RetriableException) {
                            shouldRetry = true;
                            continue;
                        }
                        throw new KafkaException("Unexpected error fetching metadata for topic " + topic, error.exception());
                    }
                }
                if (!shouldRetry) {
                    HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<String, List<PartitionInfo>>();
                    for (String topic : cluster.topics()) {
                        topicsPartitionInfos.put(topic, cluster.availablePartitionsForTopic(topic));
                    }
                    return topicsPartitionInfos;
                }
            }
            if ((remaining = timeout - (elapsed = this.time.milliseconds() - start)) <= 0L) continue;
            long backoff = Math.min(remaining, this.retryBackoffMs);
            this.time.sleep(backoff);
            remaining -= backoff;
        } while (remaining > 0L);
        throw new TimeoutException("Timeout expired while fetching topic metadata");
    }

    private RequestFuture<ClientResponse> sendMetadataRequest(MetadataRequest.Builder request) {
        Node node = this.client.leastLoadedNode();
        if (node == null) {
            return RequestFuture.noBrokersAvailable();
        }
        return this.client.send(node, request);
    }

    private void resetOffset(TopicPartition partition) {
        long timestamp;
        OffsetResetStrategy strategy = this.subscriptions.resetStrategy(partition);
        log.debug("Resetting offset for partition {} to {} offset.", (Object)partition, (Object)strategy.name().toLowerCase(Locale.ROOT));
        if (strategy == OffsetResetStrategy.EARLIEST) {
            timestamp = -2L;
        } else if (strategy == OffsetResetStrategy.LATEST) {
            timestamp = -1L;
        } else {
            throw new NoOffsetForPartitionException(partition);
        }
        Map<TopicPartition, OffsetData> offsetsByTimes = this.retrieveOffsetsByTimes(Collections.singletonMap(partition, timestamp), Long.MAX_VALUE, false);
        OffsetData offsetData = offsetsByTimes.get(partition);
        if (offsetData == null) {
            throw new NoOffsetForPartitionException(partition);
        }
        long offset = offsetData.offset;
        if (this.subscriptions.isAssigned(partition)) {
            this.subscriptions.seek(partition, offset);
        }
    }

    public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch, long timeout) {
        Map<TopicPartition, OffsetData> offsetData = this.retrieveOffsetsByTimes(timestampsToSearch, timeout, true);
        HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<TopicPartition, OffsetAndTimestamp>(offsetData.size());
        for (Map.Entry<TopicPartition, OffsetData> entry : offsetData.entrySet()) {
            OffsetData data = entry.getValue();
            if (data == null) {
                offsetsByTimes.put(entry.getKey(), null);
                continue;
            }
            offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(data.offset, data.timestamp));
        }
        return offsetsByTimes;
    }

    private Map<TopicPartition, OffsetData> retrieveOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch, long timeout, boolean requireTimestamps) {
        long elapsed;
        if (timestampsToSearch.isEmpty()) {
            return Collections.emptyMap();
        }
        long startMs = this.time.milliseconds();
        long remaining = timeout;
        do {
            RequestFuture<Map<TopicPartition, OffsetData>> future = this.sendListOffsetRequests(requireTimestamps, timestampsToSearch);
            this.client.poll(future, remaining);
            if (!future.isDone()) break;
            if (future.succeeded()) {
                return future.value();
            }
            if (!future.isRetriable()) {
                throw future.exception();
            }
            elapsed = this.time.milliseconds() - startMs;
            remaining = timeout - elapsed;
            if (remaining <= 0L) break;
            if (future.exception() instanceof InvalidMetadataException) {
                this.client.awaitMetadataUpdate(remaining);
                continue;
            }
            this.time.sleep(Math.min(remaining, this.retryBackoffMs));
        } while ((remaining = timeout - (elapsed = this.time.milliseconds() - startMs)) > 0L);
        throw new TimeoutException("Failed to get offsets by times in " + timeout + " ms");
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, long timeout) {
        return this.beginningOrEndOffset(partitions, -2L, timeout);
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, long timeout) {
        return this.beginningOrEndOffset(partitions, -1L, timeout);
    }

    private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> partitions, long timestamp, long timeout) {
        HashMap<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition, Long>();
        for (TopicPartition tp : partitions) {
            timestampsToSearch.put(tp, timestamp);
        }
        HashMap<TopicPartition, Long> result = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TopicPartition, OffsetData> entry : this.retrieveOffsetsByTimes(timestampsToSearch, timeout, false).entrySet()) {
            result.put(entry.getKey(), entry.getValue().offset);
        }
        return result;
    }

    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        if (this.nextInLineExceptionMetadata != null) {
            ExceptionMetadata exceptionMetadata = this.nextInLineExceptionMetadata;
            this.nextInLineExceptionMetadata = null;
            TopicPartition tp = exceptionMetadata.partition;
            if (this.subscriptions.isFetchable(tp) && this.subscriptions.position(tp) == exceptionMetadata.fetchedOffset) {
                throw exceptionMetadata.exception;
            }
        }
        HashMap<TopicPartition, List<ConsumerRecord<TopicPartition, List<ConsumerRecord<K, V>>>>> drained = new HashMap<TopicPartition, List<ConsumerRecord<TopicPartition, List<ConsumerRecord<K, V>>>>>();
        int recordsRemaining = this.maxPollRecords;
        while (recordsRemaining > 0) {
            if (this.nextInLineRecords == null || ((PartitionRecords)this.nextInLineRecords).isDrained()) {
                CompletedFetch completedFetch = this.completedFetches.poll();
                if (completedFetch == null) break;
                try {
                    this.nextInLineRecords = this.parseCompletedFetch(completedFetch);
                }
                catch (KafkaException e) {
                    if (drained.isEmpty()) {
                        throw e;
                    }
                    this.nextInLineExceptionMetadata = new ExceptionMetadata(completedFetch.partition, completedFetch.fetchedOffset, e);
                }
                continue;
            }
            TopicPartition partition = ((PartitionRecords)this.nextInLineRecords).partition;
            List<ConsumerRecord<K, V>> records = this.drainRecords(this.nextInLineRecords, recordsRemaining);
            if (records.isEmpty()) continue;
            List currentRecords = (List)drained.get(partition);
            if (currentRecords == null) {
                drained.put(partition, records);
            } else {
                ArrayList<ConsumerRecord<K, V>> newRecords = new ArrayList<ConsumerRecord<K, V>>(records.size() + currentRecords.size());
                newRecords.addAll(currentRecords);
                newRecords.addAll(records);
                drained.put(partition, newRecords);
            }
            recordsRemaining -= records.size();
        }
        return drained;
    }

    private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> partitionRecords, int maxRecords) {
        if (!this.subscriptions.isAssigned(((PartitionRecords)partitionRecords).partition)) {
            log.debug("Not returning fetched records for partition {} since it is no longer assigned", (Object)((PartitionRecords)partitionRecords).partition);
        } else {
            long position = this.subscriptions.position(((PartitionRecords)partitionRecords).partition);
            if (!this.subscriptions.isFetchable(((PartitionRecords)partitionRecords).partition)) {
                log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", (Object)((PartitionRecords)partitionRecords).partition);
            } else {
                if (((PartitionRecords)partitionRecords).fetchOffset == position) {
                    Long partitionLag;
                    List partRecords = ((PartitionRecords)partitionRecords).drainRecords(maxRecords);
                    if (!partRecords.isEmpty()) {
                        long nextOffset = ((ConsumerRecord)partRecords.get(partRecords.size() - 1)).offset() + 1L;
                        log.trace("Returning fetched records at offset {} for assigned partition {} and update position to {}", new Object[]{position, ((PartitionRecords)partitionRecords).partition, nextOffset});
                        this.subscriptions.position(((PartitionRecords)partitionRecords).partition, nextOffset);
                    }
                    if ((partitionLag = this.subscriptions.partitionLag(((PartitionRecords)partitionRecords).partition)) != null) {
                        this.sensors.recordPartitionLag(((PartitionRecords)partitionRecords).partition, partitionLag);
                    }
                    return partRecords;
                }
                log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", new Object[]{((PartitionRecords)partitionRecords).partition, ((PartitionRecords)partitionRecords).fetchOffset, position});
            }
        }
        ((PartitionRecords)partitionRecords).drain();
        return Collections.emptyList();
    }

    private RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequests(boolean requireTimestamps, Map<TopicPartition, Long> timestampsToSearch) {
        HashMap<Node, HashMap<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<Node, HashMap<TopicPartition, Long>>();
        for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
            TopicPartition tp = entry.getKey();
            PartitionInfo info = this.metadata.fetch().partition(tp);
            if (info == null) {
                this.metadata.add(tp.topic());
                log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", (Object)tp);
                return RequestFuture.staleMetadata();
            }
            if (info.leader() == null) {
                log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", (Object)tp);
                return RequestFuture.leaderNotAvailable();
            }
            Node node = info.leader();
            HashMap<TopicPartition, Long> topicData = (HashMap<TopicPartition, Long>)timestampsToSearchByNode.get(node);
            if (topicData == null) {
                topicData = new HashMap<TopicPartition, Long>();
                timestampsToSearchByNode.put(node, topicData);
            }
            topicData.put(entry.getKey(), entry.getValue());
        }
        final RequestFuture<Map<TopicPartition, OffsetData>> listOffsetRequestsFuture = new RequestFuture<Map<TopicPartition, OffsetData>>();
        final HashMap fetchedTimestampOffsets = new HashMap();
        final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size());
        for (Map.Entry entry : timestampsToSearchByNode.entrySet()) {
            this.sendListOffsetRequest((Node)entry.getKey(), (Map)entry.getValue(), requireTimestamps).addListener(new RequestFutureListener<Map<TopicPartition, OffsetData>>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onSuccess(Map<TopicPartition, OffsetData> value) {
                    RequestFuture requestFuture = listOffsetRequestsFuture;
                    synchronized (requestFuture) {
                        fetchedTimestampOffsets.putAll(value);
                        if (remainingResponses.decrementAndGet() == 0 && !listOffsetRequestsFuture.isDone()) {
                            listOffsetRequestsFuture.complete(fetchedTimestampOffsets);
                        }
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onFailure(RuntimeException e) {
                    RequestFuture requestFuture = listOffsetRequestsFuture;
                    synchronized (requestFuture) {
                        if (!listOffsetRequestsFuture.isDone()) {
                            listOffsetRequestsFuture.raise(e);
                        }
                    }
                }
            });
        }
        return listOffsetRequestsFuture;
    }

    private RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequest(final Node node, final Map<TopicPartition, Long> timestampsToSearch, boolean requireTimestamp) {
        ListOffsetRequest.Builder builder = new ListOffsetRequest.Builder().setTargetTimes(timestampsToSearch);
        builder.setMinVersion(requireTimestamp ? (short)1 : 0);
        log.trace("Sending ListOffsetRequest {} to broker {}", (Object)builder, (Object)node);
        return this.client.send(node, builder).compose(new RequestFutureAdapter<ClientResponse, Map<TopicPartition, OffsetData>>(){

            @Override
            public void onSuccess(ClientResponse response, RequestFuture<Map<TopicPartition, OffsetData>> future) {
                ListOffsetResponse lor = (ListOffsetResponse)response.responseBody();
                log.trace("Received ListOffsetResponse {} from broker {}", (Object)lor, (Object)node);
                Fetcher.this.handleListOffsetResponse(timestampsToSearch, lor, future);
            }
        });
    }

    private void handleListOffsetResponse(Map<TopicPartition, Long> timestampsToSearch, ListOffsetResponse listOffsetResponse, RequestFuture<Map<TopicPartition, OffsetData>> future) {
        HashMap<TopicPartition, OffsetData> timestampOffsetMap = new HashMap<TopicPartition, OffsetData>();
        for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            ListOffsetResponse.PartitionData partitionData = listOffsetResponse.responseData().get(topicPartition);
            Errors error = Errors.forCode(partitionData.errorCode);
            if (error == Errors.NONE) {
                if (partitionData.offsets != null) {
                    if (partitionData.offsets.size() > 1) {
                        future.raise(new IllegalStateException("Unexpected partitionData response of length " + partitionData.offsets.size()));
                        return;
                    }
                    long offset = partitionData.offsets.isEmpty() ? -1L : partitionData.offsets.get(0);
                    log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}", (Object)topicPartition, (Object)offset);
                    if (offset == -1L) continue;
                    OffsetData offsetData = new OffsetData(offset, null);
                    timestampOffsetMap.put(topicPartition, offsetData);
                    continue;
                }
                log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}", new Object[]{topicPartition, partitionData.offset, partitionData.timestamp});
                if (partitionData.offset == -1L) continue;
                OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp);
                timestampOffsetMap.put(topicPartition, offsetData);
                continue;
            }
            if (error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
                log.debug("Cannot search by timestamp for partition {} because the message format version is before 0.10.0", (Object)topicPartition);
                timestampOffsetMap.put(topicPartition, null);
                continue;
            }
            if (error == Errors.NOT_LEADER_FOR_PARTITION) {
                log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", (Object)topicPartition);
                future.raise(error);
                continue;
            }
            if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                log.warn("Received unknown topic or partition error in ListOffset request for partition {}. The topic/partition may not exist or the user may not have Describe access to it", (Object)topicPartition);
                future.raise(error);
                continue;
            }
            log.warn("Attempt to fetch offsets for partition {} failed due to: {}", (Object)topicPartition, (Object)error.message());
            future.raise(new StaleMetadataException());
        }
        if (!future.isDone()) {
            future.complete(timestampOffsetMap);
        }
    }

    private List<TopicPartition> fetchablePartitions() {
        HashSet<TopicPartition> exclude = new HashSet<TopicPartition>();
        List<TopicPartition> fetchable = this.subscriptions.fetchablePartitions();
        if (this.nextInLineRecords != null && !((PartitionRecords)this.nextInLineRecords).isDrained()) {
            exclude.add(((PartitionRecords)this.nextInLineRecords).partition);
        }
        for (CompletedFetch completedFetch : this.completedFetches) {
            exclude.add(completedFetch.partition);
        }
        fetchable.removeAll(exclude);
        return fetchable;
    }

    private Map<Node, FetchRequest.Builder> createFetchRequests() {
        Cluster cluster = this.metadata.fetch();
        LinkedHashMap fetchable = new LinkedHashMap();
        for (TopicPartition partition : this.fetchablePartitions()) {
            Node node = cluster.leaderFor(partition);
            if (node == null) {
                this.metadata.requestUpdate();
                continue;
            }
            if (this.client.pendingRequestCount(node) == 0) {
                LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = (LinkedHashMap<TopicPartition, FetchRequest.PartitionData>)fetchable.get(node);
                if (fetch == null) {
                    fetch = new LinkedHashMap<TopicPartition, FetchRequest.PartitionData>();
                    fetchable.put(node, fetch);
                }
                long position = this.subscriptions.position(partition);
                fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
                log.trace("Added fetch request for partition {} at offset {} to node {}", new Object[]{partition, position, node});
                continue;
            }
            log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", (Object)partition, (Object)node);
        }
        HashMap<Node, FetchRequest.Builder> requests = new HashMap<Node, FetchRequest.Builder>();
        for (Map.Entry entry : fetchable.entrySet()) {
            Node node = (Node)entry.getKey();
            FetchRequest.Builder fetch = new FetchRequest.Builder(this.maxWaitMs, this.minBytes, (LinkedHashMap)entry.getValue()).setMaxBytes(this.maxBytes);
            requests.put(node, fetch);
        }
        return requests;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PartitionRecords<K, V> parseCompletedFetch(CompletedFetch completedFetch) {
        Errors error;
        PartitionRecords parsedRecords;
        int bytes;
        TopicPartition tp;
        block20: {
            tp = completedFetch.partition;
            FetchResponse.PartitionData partition = completedFetch.partitionData;
            long fetchOffset = completedFetch.fetchedOffset;
            bytes = 0;
            int recordsCount = 0;
            parsedRecords = null;
            error = Errors.forCode(partition.errorCode);
            try {
                if (!this.subscriptions.isFetchable(tp)) {
                    log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", (Object)tp);
                    break block20;
                }
                if (error == Errors.NONE) {
                    Long position = this.subscriptions.position(tp);
                    if (position == null || position != fetchOffset) {
                        log.debug("Discarding stale fetch response for partition {} since its offset {} does not match the expected offset {}", new Object[]{tp, fetchOffset, position});
                        PartitionRecords<K, V> partitionRecords = null;
                        return partitionRecords;
                    }
                    ArrayList<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
                    boolean skippedRecords = false;
                    for (LogEntry logEntry : partition.records.deepEntries()) {
                        if (logEntry.offset() >= position) {
                            parsed.add(this.parseRecord(tp, logEntry));
                            bytes += logEntry.sizeInBytes();
                            continue;
                        }
                        skippedRecords = true;
                    }
                    recordsCount = parsed.size();
                    log.trace("Adding fetched record for partition {} with offset {} to buffered record list", (Object)tp, (Object)position);
                    parsedRecords = new PartitionRecords(fetchOffset, tp, parsed);
                    if (parsed.isEmpty() && !skippedRecords && partition.records.sizeInBytes() > 0) {
                        if (completedFetch.responseVersion < 3) {
                            Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
                            throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + recordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchSize + " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " + "newer to avoid this issue. Alternately, increase the fetch size on the client (using " + "max.partition.fetch.bytes" + ")", recordTooLargePartitions);
                        }
                        throw new KafkaException("Failed to make progress reading messages at " + tp + "=" + fetchOffset + ". Received a non-empty fetch response from the server, but no " + "complete records were found.");
                    }
                    if (partition.highWatermark >= 0L) {
                        log.trace("Received {} records in fetch response for partition {} with offset {}", new Object[]{parsed.size(), tp, position});
                        this.subscriptions.updateHighWatermark(tp, partition.highWatermark);
                    }
                    break block20;
                }
                if (error == Errors.NOT_LEADER_FOR_PARTITION) {
                    log.debug("Error in fetch for partition {}: {}", (Object)tp, (Object)error.exceptionName());
                    this.metadata.requestUpdate();
                    break block20;
                }
                if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                    log.warn("Received unknown topic or partition error in fetch for partition {}. The topic/partition may not exist or the user may not have Describe access to it", (Object)tp);
                    this.metadata.requestUpdate();
                    break block20;
                }
                if (error == Errors.OFFSET_OUT_OF_RANGE) {
                    if (fetchOffset != this.subscriptions.position(tp)) {
                        log.debug("Discarding stale fetch response for partition {} since the fetched offset {}does not match the current offset {}", new Object[]{tp, fetchOffset, this.subscriptions.position(tp)});
                        break block20;
                    }
                    if (this.subscriptions.hasDefaultOffsetResetPolicy()) {
                        log.info("Fetch offset {} is out of range for partition {}, resetting offset", (Object)fetchOffset, (Object)tp);
                        this.subscriptions.needOffsetReset(tp);
                        break block20;
                    }
                    throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
                }
                if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                    log.warn("Not authorized to read from topic {}.", (Object)tp.topic());
                    throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
                }
                if (error == Errors.UNKNOWN) {
                    log.warn("Unknown error fetching data for topic-partition {}", (Object)tp);
                    break block20;
                }
                throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
            }
            finally {
                completedFetch.metricAggregator.record(tp, bytes, recordsCount);
            }
        }
        if (bytes > 0 || error != Errors.NONE) {
            this.subscriptions.movePartitionToEnd(tp);
        }
        return parsedRecords;
    }

    private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
        Record record = logEntry.record();
        if (this.checkCrcs) {
            try {
                record.ensureValid();
            }
            catch (InvalidRecordException e) {
                throw new KafkaException("Record for partition " + partition + " at offset " + logEntry.offset() + " is invalid, cause: " + e.getMessage());
            }
        }
        try {
            long offset = logEntry.offset();
            long timestamp = record.timestamp();
            TimestampType timestampType = record.timestampType();
            ByteBuffer keyBytes = record.key();
            byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
            Object key = keyBytes == null ? null : (Object)this.keyDeserializer.deserialize(partition.topic(), keyByteArray);
            ByteBuffer valueBytes = record.value();
            byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
            Object value = valueBytes == null ? null : (Object)this.valueDeserializer.deserialize(partition.topic(), valueByteArray);
            return new ConsumerRecord<Object, Object>(partition.topic(), partition.partition(), offset, timestamp, timestampType, record.checksum(), keyByteArray == null ? -1 : keyByteArray.length, valueByteArray == null ? -1 : valueByteArray.length, key, value);
        }
        catch (RuntimeException e) {
            throw new SerializationException("Error deserializing key/value for partition " + partition + " at offset " + logEntry.offset(), e);
        }
    }

    @Override
    public void onAssignment(Set<TopicPartition> assignment) {
        this.sensors.updatePartitionLagSensors(assignment);
    }

    private static class ExceptionMetadata {
        private final TopicPartition partition;
        private final long fetchedOffset;
        private final KafkaException exception;

        private ExceptionMetadata(TopicPartition partition, long fetchedOffset, KafkaException exception) {
            this.partition = partition;
            this.fetchedOffset = fetchedOffset;
            this.exception = exception;
        }
    }

    private static class FetchManagerMetrics {
        private final Metrics metrics;
        private final String metricGrpName;
        private final Sensor bytesFetched;
        private final Sensor recordsFetched;
        private final Sensor fetchLatency;
        private final Sensor recordsFetchLag;
        private final Sensor fetchThrottleTimeSensor;
        private Set<TopicPartition> assignedPartitions;

        private FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) {
            this.metrics = metrics;
            this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
            this.bytesFetched = metrics.sensor("bytes-fetched");
            this.bytesFetched.add(metrics.metricName("fetch-size-avg", this.metricGrpName, "The average number of bytes fetched per request"), new Avg());
            this.bytesFetched.add(metrics.metricName("fetch-size-max", this.metricGrpName, "The maximum number of bytes fetched per request"), new Max());
            this.bytesFetched.add(metrics.metricName("bytes-consumed-rate", this.metricGrpName, "The average number of bytes consumed per second"), new Rate());
            this.recordsFetched = metrics.sensor("records-fetched");
            this.recordsFetched.add(metrics.metricName("records-per-request-avg", this.metricGrpName, "The average number of records in each request"), new Avg());
            this.recordsFetched.add(metrics.metricName("records-consumed-rate", this.metricGrpName, "The average number of records consumed per second"), new Rate());
            this.fetchLatency = metrics.sensor("fetch-latency");
            this.fetchLatency.add(metrics.metricName("fetch-latency-avg", this.metricGrpName, "The average time taken for a fetch request."), new Avg());
            this.fetchLatency.add(metrics.metricName("fetch-latency-max", this.metricGrpName, "The max time taken for any fetch request."), new Max());
            this.fetchLatency.add(metrics.metricName("fetch-rate", this.metricGrpName, "The number of fetch requests per second."), new Rate(new Count()));
            this.recordsFetchLag = metrics.sensor("records-lag");
            this.recordsFetchLag.add(metrics.metricName("records-lag-max", this.metricGrpName, "The maximum lag in terms of number of records for any partition in this window"), new Max());
            this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
            this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg", this.metricGrpName, "The average throttle time in ms"), new Avg());
            this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max", this.metricGrpName, "The maximum throttle time in ms"), new Max());
        }

        private void recordTopicFetchMetrics(String topic, int bytes, int records) {
            String name = "topic." + topic + ".bytes-fetched";
            Sensor bytesFetched = this.metrics.getSensor(name);
            if (bytesFetched == null) {
                Map<String, String> metricTags = Collections.singletonMap("topic", topic.replace('.', '_'));
                bytesFetched = this.metrics.sensor(name);
                bytesFetched.add(this.metrics.metricName("fetch-size-avg", this.metricGrpName, "The average number of bytes fetched per request for topic " + topic, metricTags), new Avg());
                bytesFetched.add(this.metrics.metricName("fetch-size-max", this.metricGrpName, "The maximum number of bytes fetched per request for topic " + topic, metricTags), new Max());
                bytesFetched.add(this.metrics.metricName("bytes-consumed-rate", this.metricGrpName, "The average number of bytes consumed per second for topic " + topic, metricTags), new Rate());
            }
            bytesFetched.record(bytes);
            name = "topic." + topic + ".records-fetched";
            Sensor recordsFetched = this.metrics.getSensor(name);
            if (recordsFetched == null) {
                HashMap<String, String> metricTags = new HashMap<String, String>(1);
                metricTags.put("topic", topic.replace('.', '_'));
                recordsFetched = this.metrics.sensor(name);
                recordsFetched.add(this.metrics.metricName("records-per-request-avg", this.metricGrpName, "The average number of records in each request for topic " + topic, metricTags), new Avg());
                recordsFetched.add(this.metrics.metricName("records-consumed-rate", this.metricGrpName, "The average number of records consumed per second for topic " + topic, metricTags), new Rate());
            }
            recordsFetched.record(records);
        }

        private void updatePartitionLagSensors(Set<TopicPartition> assignedPartitions) {
            if (this.assignedPartitions != null) {
                for (TopicPartition tp : this.assignedPartitions) {
                    if (assignedPartitions.contains(tp)) continue;
                    this.metrics.removeSensor(FetchManagerMetrics.partitionLagMetricName(tp));
                }
            }
            this.assignedPartitions = assignedPartitions;
        }

        private void recordPartitionLag(TopicPartition tp, long lag) {
            this.recordsFetchLag.record(lag);
            String name = FetchManagerMetrics.partitionLagMetricName(tp);
            Sensor recordsLag = this.metrics.getSensor(name);
            if (recordsLag == null) {
                recordsLag = this.metrics.sensor(name);
                recordsLag.add(this.metrics.metricName(name, this.metricGrpName, "The latest lag of the partition"), new Value());
                recordsLag.add(this.metrics.metricName(name + "-max", this.metricGrpName, "The max lag of the partition"), new Max());
                recordsLag.add(this.metrics.metricName(name + "-avg", this.metricGrpName, "The average lag of the partition"), new Avg());
            }
            recordsLag.record(lag);
        }

        private static String partitionLagMetricName(TopicPartition tp) {
            return tp + ".records-lag";
        }
    }

    private static class FetchResponseMetricAggregator {
        private final FetchManagerMetrics sensors;
        private final Set<TopicPartition> unrecordedPartitions;
        private final FetchMetrics fetchMetrics = new FetchMetrics();
        private final Map<String, FetchMetrics> topicFetchMetrics = new HashMap<String, FetchMetrics>();

        private FetchResponseMetricAggregator(FetchManagerMetrics sensors, Set<TopicPartition> partitions) {
            this.sensors = sensors;
            this.unrecordedPartitions = partitions;
        }

        public void record(TopicPartition partition, int bytes, int records) {
            this.unrecordedPartitions.remove(partition);
            this.fetchMetrics.increment(bytes, records);
            String topic = partition.topic();
            FetchMetrics topicFetchMetric = this.topicFetchMetrics.get(topic);
            if (topicFetchMetric == null) {
                topicFetchMetric = new FetchMetrics();
                this.topicFetchMetrics.put(topic, topicFetchMetric);
            }
            topicFetchMetric.increment(bytes, records);
            if (this.unrecordedPartitions.isEmpty()) {
                this.sensors.bytesFetched.record(topicFetchMetric.fetchBytes);
                this.sensors.recordsFetched.record(topicFetchMetric.fetchRecords);
                for (Map.Entry<String, FetchMetrics> entry : this.topicFetchMetrics.entrySet()) {
                    FetchMetrics metric = entry.getValue();
                    this.sensors.recordTopicFetchMetrics(entry.getKey(), metric.fetchBytes, metric.fetchRecords);
                }
            }
        }

        private static class FetchMetrics {
            private int fetchBytes;
            private int fetchRecords;

            private FetchMetrics() {
            }

            protected void increment(int bytes, int records) {
                this.fetchBytes += bytes;
                this.fetchRecords += records;
            }
        }
    }

    private static class CompletedFetch {
        private final TopicPartition partition;
        private final long fetchedOffset;
        private final FetchResponse.PartitionData partitionData;
        private final FetchResponseMetricAggregator metricAggregator;
        private final short responseVersion;

        private CompletedFetch(TopicPartition partition, long fetchedOffset, FetchResponse.PartitionData partitionData, FetchResponseMetricAggregator metricAggregator, short responseVersion) {
            this.partition = partition;
            this.fetchedOffset = fetchedOffset;
            this.partitionData = partitionData;
            this.metricAggregator = metricAggregator;
            this.responseVersion = responseVersion;
        }
    }

    private static class PartitionRecords<K, V> {
        private long fetchOffset;
        private TopicPartition partition;
        private List<ConsumerRecord<K, V>> records;
        private int position = 0;

        private PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
            this.fetchOffset = fetchOffset;
            this.partition = partition;
            this.records = records;
        }

        private boolean isDrained() {
            return this.records == null;
        }

        private void drain() {
            this.records = null;
        }

        private List<ConsumerRecord<K, V>> drainRecords(int n) {
            if (this.isDrained() || this.position >= this.records.size()) {
                this.drain();
                return Collections.emptyList();
            }
            int limit = Math.min(this.records.size(), this.position + n);
            List<ConsumerRecord<K, V>> res = Collections.unmodifiableList(this.records.subList(this.position, limit));
            this.position = limit;
            if (this.position < this.records.size()) {
                this.fetchOffset = this.records.get(this.position).offset();
            }
            return res;
        }
    }

    private static class OffsetData {
        final long offset;
        final Long timestamp;

        OffsetData(long offset, Long timestamp) {
            this.offset = offset;
            this.timestamp = timestamp;
        }
    }
}

