/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.io.DataOutputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class FetcherTest {
    private static final double EPSILON = 1.0E-4;
    private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
    private String topicName = "test";
    private String groupId = "test-group";
    private final String metricGroup = "consumer" + this.groupId + "-fetch-manager-metrics";
    private TopicPartition tp0 = new TopicPartition(this.topicName, 0);
    private TopicPartition tp1 = new TopicPartition(this.topicName, 1);
    private TopicPartition tp2 = new TopicPartition(this.topicName, 2);
    private TopicPartition tp3 = new TopicPartition(this.topicName, 3);
    private MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap(this.topicName, 4));
    private int minBytes = 1;
    private int maxBytes = Integer.MAX_VALUE;
    private int maxWaitMs = 0;
    private int fetchSize = 1000;
    private long retryBackoffMs = 100L;
    private long requestTimeoutMs = 30000L;
    private MockTime time = new MockTime(1L);
    private SubscriptionState subscriptions;
    private ConsumerMetadata metadata;
    private FetcherMetricsRegistry metricsRegistry;
    private MockClient client;
    private Metrics metrics;
    private ApiVersions apiVersions = new ApiVersions();
    private ConsumerNetworkClient consumerClient;
    private Fetcher<?, ?> fetcher;
    private MemoryRecords records;
    private MemoryRecords nextRecords;
    private MemoryRecords emptyRecords;
    private MemoryRecords partialRecords;
    private ExecutorService executorService;

    @Before
    public void setup() {
        this.records = this.buildRecords(1L, 3, 1L);
        this.nextRecords = this.buildRecords(4L, 2, 4L);
        this.emptyRecords = this.buildRecords(0L, 0, 0L);
        this.partialRecords = this.buildRecords(4L, 1, 0L);
        this.partialRecords.buffer().putInt(8, 10000);
    }

    private void assignFromUser(Set<TopicPartition> partitions) {
        this.subscriptions.assignFromUser(partitions);
        this.client.updateMetadata(this.initialUpdateResponse);
    }

    @After
    public void teardown() throws Exception {
        if (this.metrics != null) {
            this.metrics.close();
        }
        if (this.fetcher != null) {
            this.fetcher.close();
        }
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            Assert.assertTrue((boolean)this.executorService.awaitTermination(5L, TimeUnit.SECONDS));
        }
    }

    @Test
    public void testFetchNormal() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        List records = partitionRecords.get(this.tp0);
        Assert.assertEquals((long)3L, (long)records.size());
        Assert.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        long offset = 1L;
        for (ConsumerRecord record : records) {
            Assert.assertEquals((long)offset, (long)record.offset());
            ++offset;
        }
    }

    @Test
    public void testMissingLeaderEpochInRecords() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)0, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L, (long)System.currentTimeMillis(), (int)-1);
        builder.append(0L, "key".getBytes(), "1".getBytes());
        builder.append(0L, "key".getBytes(), "2".getBytes());
        MemoryRecords records = builder.build();
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assert.assertEquals((long)2L, (long)partitionRecords.get(this.tp0).size());
        for (ConsumerRecord record : partitionRecords.get(this.tp0)) {
            Assert.assertEquals(Optional.empty(), (Object)record.leaderEpoch());
        }
    }

    @Test
    public void testLeaderEpochInConsumerRecord() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Integer partitionLeaderEpoch = 1;
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L, (long)System.currentTimeMillis(), (int)partitionLeaderEpoch);
        builder.append(0L, "key".getBytes(), partitionLeaderEpoch.toString().getBytes());
        builder.append(0L, "key".getBytes(), partitionLeaderEpoch.toString().getBytes());
        builder.close();
        partitionLeaderEpoch = partitionLeaderEpoch + 7;
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)2L, (long)System.currentTimeMillis(), (int)partitionLeaderEpoch);
        builder.append(0L, "key".getBytes(), partitionLeaderEpoch.toString().getBytes());
        builder.close();
        partitionLeaderEpoch = partitionLeaderEpoch + 5;
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)3L, (long)System.currentTimeMillis(), (int)partitionLeaderEpoch);
        builder.append(0L, "key".getBytes(), partitionLeaderEpoch.toString().getBytes());
        builder.append(0L, "key".getBytes(), partitionLeaderEpoch.toString().getBytes());
        builder.append(0L, "key".getBytes(), partitionLeaderEpoch.toString().getBytes());
        builder.close();
        buffer.flip();
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assert.assertEquals((long)6L, (long)partitionRecords.get(this.tp0).size());
        for (ConsumerRecord record : partitionRecords.get(this.tp0)) {
            int expectedLeaderEpoch = Integer.parseInt(Utils.utf8((byte[])((byte[])record.value())));
            Assert.assertEquals(Optional.of(expectedLeaderEpoch), (Object)record.leaderEpoch());
        }
    }

    @Test
    public void testClearBufferedDataForTopicPartitions() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        HashSet<TopicPartition> newAssignedTopicPartitions = new HashSet<TopicPartition>();
        newAssignedTopicPartitions.add(this.tp1);
        this.fetcher.clearBufferedDataForUnassignedPartitions(newAssignedTopicPartitions);
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
    }

    @Test
    public void testFetchSkipsBlackedOutNodes() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.client.updateMetadata(this.initialUpdateResponse);
        Node node = (Node)this.initialUpdateResponse.brokers().iterator().next();
        this.client.blackout(node, 500L);
        Assert.assertEquals((long)0L, (long)this.fetcher.sendFetches());
        this.time.sleep(500L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
    }

    @Test
    public void testFetcherIgnoresControlRecords() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        long producerId = 1L;
        short producerEpoch = 0;
        int baseSequence = 0;
        int partitionLeaderEpoch = 0;
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        MemoryRecordsBuilder builder = MemoryRecords.idempotentBuilder((ByteBuffer)buffer, (CompressionType)CompressionType.NONE, (long)0L, (long)producerId, (short)producerEpoch, (int)baseSequence);
        builder.append(0L, "key".getBytes(), null);
        builder.close();
        MemoryRecords.writeEndTransactionalMarker((ByteBuffer)buffer, (long)1L, (long)this.time.milliseconds(), (int)partitionLeaderEpoch, (long)producerId, (short)producerEpoch, (EndTransactionMarker)new EndTransactionMarker(ControlRecordType.ABORT, 0));
        buffer.flip();
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, MemoryRecords.readableRecords((ByteBuffer)buffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        List records = partitionRecords.get(this.tp0);
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals((long)2L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        ConsumerRecord record = records.get(0);
        Assert.assertArrayEquals((byte[])"key".getBytes(), (byte[])((byte[])record.key()));
    }

    @Test
    public void testFetchError() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assert.assertFalse((boolean)partitionRecords.containsKey(this.tp0));
    }

    private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) {
        return new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                FetchRequest fetch = (FetchRequest)body;
                return fetch.fetchData().containsKey(tp) && ((FetchRequest.PartitionData)fetch.fetchData().get((Object)tp)).fetchOffset == offset;
            }
        };
    }

    @Test
    public void testFetchedRecordsRaisesOnSerializationErrors() {
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer(){
            int i = 0;

            public byte[] deserialize(String topic, byte[] data) {
                if (this.i++ % 2 == 1) {
                    Assert.assertEquals((Object)"value-1", (Object)new String(data, StandardCharsets.UTF_8));
                    throw new SerializationException();
                }
                return data;
            }
        };
        this.buildFetcher((Deserializer<?>)deserializer, (Deserializer<?>)deserializer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tp0, 1L), (AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        for (int i = 0; i < 2; ++i) {
            try {
                this.fetcher.fetchedRecords();
                Assert.fail((String)"fetchedRecords should have raised");
                continue;
            }
            catch (SerializationException e) {
                Assert.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
            }
        }
    }

    @Test
    public void testParseCorruptedRecord() throws Exception {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        DataOutputStream out = new DataOutputStream((OutputStream)new ByteBufferOutputStream(buffer));
        byte magic = 1;
        byte[] key = "foo".getBytes();
        byte[] value = "baz".getBytes();
        long offset = 0L;
        long timestamp = 500L;
        int size = LegacyRecord.recordSize((byte)magic, (int)key.length, (int)value.length);
        byte attributes = LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME);
        long crc = LegacyRecord.computeChecksum((byte)magic, (byte)attributes, (long)timestamp, (byte[])key, (byte[])value);
        out.writeLong(offset);
        out.writeInt(size);
        LegacyRecord.write((DataOutputStream)out, (byte)magic, (long)crc, (byte)LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME), (long)timestamp, (byte[])key, (byte[])value);
        out.writeLong(offset + 1L);
        out.writeInt(size);
        LegacyRecord.write((DataOutputStream)out, (byte)magic, (long)(crc + 1L), (byte)LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME), (long)timestamp, (byte[])key, (byte[])value);
        out.writeLong(offset + 2L);
        out.writeInt(size);
        LegacyRecord.write((DataOutputStream)out, (byte)magic, (long)crc, (byte)LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME), (long)timestamp, (byte[])key, (byte[])value);
        out.writeLong(offset + 3L);
        out.writeInt(1);
        out.writeLong(offset + 4L);
        out.writeInt(size);
        LegacyRecord.write((DataOutputStream)out, (byte)magic, (long)crc, (byte)LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME), (long)timestamp, (byte[])key, (byte[])value);
        buffer.flip();
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, MemoryRecords.readableRecords((ByteBuffer)buffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals((long)1L, (long)((List)this.fetcher.fetchedRecords().get(this.tp0)).size());
        Assert.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        this.ensureBlockOnRecord(1L);
        this.seekAndConsumeRecord(buffer, 2L);
        this.ensureBlockOnRecord(3L);
        try {
            this.seekAndConsumeRecord(buffer, 4L);
            Assert.fail((String)"Should have thrown exception when fail to retrieve a record from iterator.");
        }
        catch (KafkaException kafkaException) {
            // empty catch block
        }
        this.ensureBlockOnRecord(4L);
    }

    private void ensureBlockOnRecord(long blockedOffset) {
        for (int i = 0; i < 2; ++i) {
            try {
                this.fetcher.fetchedRecords();
                Assert.fail((String)"fetchedRecords should have raised KafkaException");
                continue;
            }
            catch (KafkaException e) {
                Assert.assertEquals((long)blockedOffset, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
            }
        }
    }

    private void seekAndConsumeRecord(ByteBuffer responseBuffer, long toOffset) {
        this.subscriptions.seek(this.tp0, toOffset);
        this.fetcher.fetchedRecords();
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, MemoryRecords.readableRecords((ByteBuffer)responseBuffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Map recordsByPartition = this.fetchedRecords();
        List records = recordsByPartition.get(this.tp0);
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals((long)toOffset, (long)records.get(0).offset());
        Assert.assertEquals((long)(toOffset + 1L), (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testInvalidDefaultRecordBatch() {
        this.buildFetcher();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(out, 2, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, 10L, 0L, 0, 0, false, false, 0, 1024);
        builder.append(10L, "key".getBytes(), "value".getBytes());
        builder.close();
        buffer.flip();
        buffer.position(17);
        buffer.put("beef".getBytes());
        buffer.position(0);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, MemoryRecords.readableRecords((ByteBuffer)buffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        for (int i = 0; i < 2; ++i) {
            try {
                this.fetcher.fetchedRecords();
                Assert.fail((String)"fetchedRecords should have raised KafkaException");
                continue;
            }
            catch (KafkaException e) {
                Assert.assertEquals((long)0L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
            }
        }
    }

    @Test
    public void testParseInvalidRecordBatch() {
        this.buildFetcher();
        MemoryRecords records = MemoryRecords.withRecords((byte)2, (long)0L, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())});
        ByteBuffer buffer = records.buffer();
        buffer.putInt(32, buffer.get(32) ^ 0x5332717);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, MemoryRecords.readableRecords((ByteBuffer)buffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        try {
            this.fetcher.fetchedRecords();
            Assert.fail((String)"fetchedRecords should have raised");
        }
        catch (KafkaException e) {
            Assert.assertEquals((long)0L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        }
    }

    @Test
    public void testHeaders() {
        this.buildFetcher();
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)1L);
        builder.append(0L, "key".getBytes(), "value-1".getBytes());
        Header[] headersArray = new Header[]{new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8))};
        builder.append(0L, "key".getBytes(), "value-2".getBytes(), headersArray);
        Header[] headersArray2 = new Header[]{new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8)), new RecordHeader("headerKey", "headerValue2".getBytes(StandardCharsets.UTF_8))};
        builder.append(0L, "key".getBytes(), "value-3".getBytes(), headersArray2);
        MemoryRecords memoryRecords = builder.build();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tp0, 1L), (AbstractResponse)this.fullFetchResponse(this.tp0, memoryRecords, Errors.NONE, 100L, 0));
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Map recordsByPartition = this.fetchedRecords();
        List records = recordsByPartition.get(this.tp0);
        Assert.assertEquals((long)3L, (long)records.size());
        Iterator recordIterator = records.iterator();
        ConsumerRecord record = recordIterator.next();
        Assert.assertNull((Object)record.headers().lastHeader("headerKey"));
        record = recordIterator.next();
        Assert.assertEquals((Object)"headerValue", (Object)new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
        Assert.assertEquals((Object)"headerKey", (Object)record.headers().lastHeader("headerKey").key());
        record = recordIterator.next();
        Assert.assertEquals((Object)"headerValue2", (Object)new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
        Assert.assertEquals((Object)"headerKey", (Object)record.headers().lastHeader("headerKey").key());
    }

    @Test
    public void testFetchMaxPollRecords() {
        this.buildFetcher(2);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tp0, 1L), (AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.client.prepareResponse(this.matchesOffset(this.tp0, 4L), (AbstractResponse)this.fullFetchResponse(this.tp0, this.nextRecords, Errors.NONE, 100L, 0));
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Map recordsByPartition = this.fetchedRecords();
        List records = recordsByPartition.get(this.tp0);
        Assert.assertEquals((long)2L, (long)records.size());
        Assert.assertEquals((long)3L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assert.assertEquals((long)1L, (long)records.get(0).offset());
        Assert.assertEquals((long)2L, (long)records.get(1).offset());
        Assert.assertEquals((long)0L, (long)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        recordsByPartition = this.fetchedRecords();
        records = recordsByPartition.get(this.tp0);
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assert.assertEquals((long)3L, (long)records.get(0).offset());
        Assert.assertTrue((this.fetcher.sendFetches() > 0 ? 1 : 0) != 0);
        this.consumerClient.poll(this.time.timer(0L));
        recordsByPartition = this.fetchedRecords();
        records = recordsByPartition.get(this.tp0);
        Assert.assertEquals((long)2L, (long)records.size());
        Assert.assertEquals((long)6L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assert.assertEquals((long)4L, (long)records.get(0).offset());
        Assert.assertEquals((long)5L, (long)records.get(1).offset());
    }

    @Test
    public void testFetchAfterPartitionWithFetchedRecordsIsUnassigned() {
        this.buildFetcher(2);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tp0, 1L), (AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Map recordsByPartition = this.fetchedRecords();
        List records = recordsByPartition.get(this.tp0);
        Assert.assertEquals((long)2L, (long)records.size());
        Assert.assertEquals((long)3L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assert.assertEquals((long)1L, (long)records.get(0).offset());
        Assert.assertEquals((long)2L, (long)records.get(1).offset());
        this.assignFromUser(Collections.singleton(this.tp1));
        this.client.prepareResponse(this.matchesOffset(this.tp1, 4L), (AbstractResponse)this.fullFetchResponse(this.tp1, this.nextRecords, Errors.NONE, 100L, 0));
        this.subscriptions.seek(this.tp1, 4L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords = this.fetchedRecords();
        Assert.assertNull(fetchedRecords.get(this.tp0));
        records = fetchedRecords.get(this.tp1);
        Assert.assertEquals((long)2L, (long)records.size());
        Assert.assertEquals((long)6L, (long)this.subscriptions.position((TopicPartition)this.tp1).offset);
        Assert.assertEquals((long)4L, (long)records.get(0).offset());
        Assert.assertEquals((long)5L, (long)records.get(1).offset());
    }

    @Test
    public void testFetchNonContinuousRecords() {
        this.buildFetcher();
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        builder.appendWithOffset(15L, 0L, "key".getBytes(), "value-1".getBytes());
        builder.appendWithOffset(20L, 0L, "key".getBytes(), "value-2".getBytes());
        builder.appendWithOffset(30L, 0L, "key".getBytes(), "value-3".getBytes());
        MemoryRecords records = builder.build();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Map recordsByPartition = this.fetchedRecords();
        List consumerRecords = recordsByPartition.get(this.tp0);
        Assert.assertEquals((long)3L, (long)consumerRecords.size());
        Assert.assertEquals((long)31L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assert.assertEquals((long)15L, (long)consumerRecords.get(0).offset());
        Assert.assertEquals((long)20L, (long)consumerRecords.get(1).offset());
        Assert.assertEquals((long)30L, (long)consumerRecords.get(2).offset());
    }

    @Test
    public void testFetchRequestWhenRecordTooLarge() {
        try {
            this.buildFetcher();
            this.client.setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(new ApiVersionsResponse.ApiVersion(ApiKeys.FETCH, 2, 2))));
            this.makeFetchRequestWithIncompleteRecord();
            try {
                this.fetcher.fetchedRecords();
                Assert.fail((String)"RecordTooLargeException should have been raised");
            }
            catch (RecordTooLargeException e) {
                Assert.assertTrue((boolean)e.getMessage().startsWith("There are some messages at [Partition=Offset]: "));
                Assert.assertEquals((long)0L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
            }
        }
        finally {
            this.client.setNodeApiVersions(NodeApiVersions.create());
        }
    }

    @Test
    public void testFetchRequestInternalError() {
        this.buildFetcher();
        this.makeFetchRequestWithIncompleteRecord();
        try {
            this.fetcher.fetchedRecords();
            Assert.fail((String)"RecordTooLargeException should have been raised");
        }
        catch (KafkaException e) {
            Assert.assertTrue((boolean)e.getMessage().startsWith("Failed to make progress reading messages"));
            Assert.assertEquals((long)0L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        }
    }

    private void makeFetchRequestWithIncompleteRecord() {
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        MemoryRecords partialRecord = MemoryRecords.readableRecords((ByteBuffer)ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}));
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, partialRecord, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
    }

    @Test
    public void testUnauthorizedTopic() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        try {
            this.fetcher.fetchedRecords();
            Assert.fail((String)"fetchedRecords should have thrown");
        }
        catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.topicName), (Object)e.unauthorizedTopics());
        }
    }

    @Test
    public void testFetchDuringRebalance() {
        this.buildFetcher();
        this.subscriptions.subscribe(Collections.singleton(this.topicName), this.listener);
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.client.updateMetadata(this.initialUpdateResponse);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.fetchedRecords().isEmpty());
    }

    @Test
    public void testInFlightFetchOnPausedPartition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.subscriptions.pause(this.tp0);
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertNull(this.fetcher.fetchedRecords().get(this.tp0));
    }

    @Test
    public void testFetchOnPausedPartition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.subscriptions.pause(this.tp0);
        Assert.assertFalse((this.fetcher.sendFetches() > 0 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)this.client.requests().isEmpty());
    }

    @Test
    public void testFetchNotLeaderForPartition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchUnknownTopicOrPartition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchFencedLeaderEpoch() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.FENCED_LEADER_EPOCH, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals((String)"Should not return any records", (long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertEquals((String)"Should have requested metadata update", (long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchUnknownLeaderEpoch() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.UNKNOWN_LEADER_EPOCH, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals((String)"Should not return any records", (long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertNotEquals((String)"Should not have requested metadata update", (long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testEpochSetInFetchRequest() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap(this.topicName, 4), tp -> 99);
        this.client.updateMetadata(metadataResponse);
        this.subscriptions.seek(this.tp0, 10L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        MockClient.RequestMatcher matcher = body -> {
            if (body instanceof FetchRequest) {
                FetchRequest fetchRequest = (FetchRequest)body;
                fetchRequest.fetchData().values().forEach(partitionData -> {
                    Assert.assertTrue((String)"Expected Fetcher to set leader epoch in request", (boolean)partitionData.currentLeaderEpoch.isPresent());
                    Assert.assertEquals((String)"Expected leader epoch to match epoch from metadata update", (long)99L, (long)((Integer)partitionData.currentLeaderEpoch.get()).longValue());
                });
                return true;
            }
            Assert.fail((String)"Should have seen FetchRequest");
            return false;
        };
        this.client.prepareResponse(matcher, (AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.pollNoWakeup();
    }

    @Test
    public void testFetchOffsetOutOfRange() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertNull((Object)this.subscriptions.validPosition(this.tp0));
        Assert.assertNotNull((Object)this.subscriptions.position(this.tp0));
    }

    @Test
    public void testStaleOutOfRangeError() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.subscriptions.seek(this.tp0, 1L);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testFetchedRecordsAfterSeek() {
        this.buildFetcher(OffsetResetStrategy.NONE, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertTrue((this.fetcher.sendFetches() > 0 ? 1 : 0) != 0);
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        this.subscriptions.seek(this.tp0, 2L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
    }

    @Test
    public void testFetchOffsetOutOfRangeException() {
        this.buildFetcher(OffsetResetStrategy.NONE, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.fetcher.sendFetches();
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        for (int i = 0; i < 2; ++i) {
            OffsetOutOfRangeException e = (OffsetOutOfRangeException)Assert.assertThrows(OffsetOutOfRangeException.class, () -> this.fetcher.fetchedRecords());
            Assert.assertEquals(Collections.singleton(this.tp0), e.offsetOutOfRangePartitions().keySet());
            Assert.assertEquals((long)0L, (long)((Long)e.offsetOutOfRangePartitions().get(this.tp0)));
        }
    }

    @Test
    public void testFetchPositionAfterException() {
        this.buildFetcher(OffsetResetStrategy.NONE, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
        this.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 1L);
        this.subscriptions.seek(this.tp1, 1L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> partitions = new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>();
        partitions.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, -1L, null, (BaseRecords)this.records));
        partitions.put(this.tp0, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, -1L, null, (BaseRecords)MemoryRecords.EMPTY));
        this.client.prepareResponse((AbstractResponse)new FetchResponse(Errors.NONE, new LinkedHashMap(partitions), 0, 0));
        this.consumerClient.poll(this.time.timer(0L));
        ArrayList<ConsumerRecord<byte[], byte[]>> allFetchedRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>();
        this.fetchRecordsInto(allFetchedRecords);
        Assert.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assert.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp1).offset);
        Assert.assertEquals((long)3L, (long)allFetchedRecords.size());
        OffsetOutOfRangeException e = (OffsetOutOfRangeException)Assert.assertThrows(OffsetOutOfRangeException.class, () -> this.fetchRecordsInto(allFetchedRecords));
        Assert.assertEquals(Collections.singleton(this.tp0), e.offsetOutOfRangePartitions().keySet());
        Assert.assertEquals((long)1L, (long)((Long)e.offsetOutOfRangePartitions().get(this.tp0)));
        Assert.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assert.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp1).offset);
        Assert.assertEquals((long)3L, (long)allFetchedRecords.size());
    }

    private void fetchRecordsInto(List<ConsumerRecord<byte[], byte[]>> allFetchedRecords) {
        Map fetchedRecords = this.fetchedRecords();
        fetchedRecords.values().forEach(allFetchedRecords::addAll);
    }

    @Test
    public void testCompletedFetchRemoval() {
        this.buildFetcher(OffsetResetStrategy.NONE, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
        this.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1, this.tp2, this.tp3}));
        this.client.updateMetadata(this.initialUpdateResponse);
        this.subscriptions.seek(this.tp0, 1L);
        this.subscriptions.seek(this.tp1, 1L);
        this.subscriptions.seek(this.tp2, 1L);
        this.subscriptions.seek(this.tp3, 1L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> partitions = new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>();
        partitions.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, -1L, null, (BaseRecords)this.records));
        partitions.put(this.tp0, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, -1L, null, (BaseRecords)MemoryRecords.EMPTY));
        partitions.put(this.tp2, new FetchResponse.PartitionData(Errors.NONE, 100L, 4L, 0L, null, (BaseRecords)this.nextRecords));
        partitions.put(this.tp3, new FetchResponse.PartitionData(Errors.NONE, 100L, 4L, 0L, null, (BaseRecords)this.partialRecords));
        this.client.prepareResponse((AbstractResponse)new FetchResponse(Errors.NONE, new LinkedHashMap(partitions), 0, 0));
        this.consumerClient.poll(this.time.timer(0L));
        ArrayList fetchedRecords = new ArrayList();
        Map recordsByPartition = this.fetchedRecords();
        for (List list : recordsByPartition.values()) {
            fetchedRecords.addAll(list);
        }
        Assert.assertEquals((long)fetchedRecords.size(), (long)(this.subscriptions.position((TopicPartition)this.tp1).offset - 1L));
        Assert.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp1).offset);
        Assert.assertEquals((long)3L, (long)fetchedRecords.size());
        ArrayList<OffsetOutOfRangeException> oorExceptions = new ArrayList<OffsetOutOfRangeException>();
        try {
            recordsByPartition = this.fetchedRecords();
            for (List list : recordsByPartition.values()) {
                fetchedRecords.addAll(list);
            }
        }
        catch (OffsetOutOfRangeException offsetOutOfRangeException) {
            oorExceptions.add(offsetOutOfRangeException);
        }
        Assert.assertEquals((long)1L, (long)oorExceptions.size());
        OffsetOutOfRangeException offsetOutOfRangeException = (OffsetOutOfRangeException)((Object)oorExceptions.get(0));
        Assert.assertTrue((boolean)offsetOutOfRangeException.offsetOutOfRangePartitions().containsKey(this.tp0));
        Assert.assertEquals((long)offsetOutOfRangeException.offsetOutOfRangePartitions().size(), (long)1L);
        recordsByPartition = this.fetchedRecords();
        for (List list : recordsByPartition.values()) {
            fetchedRecords.addAll(list);
        }
        Assert.assertEquals((long)6L, (long)this.subscriptions.position((TopicPartition)this.tp2).offset);
        Assert.assertEquals((long)5L, (long)fetchedRecords.size());
        int n = 3;
        ArrayList<KafkaException> arrayList = new ArrayList<KafkaException>();
        for (int i = 1; i <= n; ++i) {
            try {
                recordsByPartition = this.fetchedRecords();
                for (List records : recordsByPartition.values()) {
                    fetchedRecords.addAll(records);
                }
                continue;
            }
            catch (KafkaException e) {
                arrayList.add(e);
            }
        }
        Assert.assertEquals((long)n, (long)arrayList.size());
    }

    @Test
    public void testSeekBeforeException() {
        this.buildFetcher(OffsetResetStrategy.NONE, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED);
        this.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0}));
        this.subscriptions.seek(this.tp0, 1L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        HashMap<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<TopicPartition, FetchResponse.PartitionData>();
        partitions.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, -1L, Optional.empty(), null, (BaseRecords)this.records));
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals((long)2L, (long)((List)this.fetcher.fetchedRecords().get(this.tp0)).size());
        this.subscriptions.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp1, 1L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        partitions = new HashMap();
        partitions.put(this.tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, -1L, Optional.empty(), null, (BaseRecords)MemoryRecords.EMPTY));
        this.client.prepareResponse((AbstractResponse)new FetchResponse(Errors.NONE, new LinkedHashMap(partitions), 0, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals((long)1L, (long)((List)this.fetcher.fetchedRecords().get(this.tp0)).size());
        this.subscriptions.seek(this.tp1, 10L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
    }

    @Test
    public void testFetchDisconnected() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0), true);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals((long)0L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionNoOpWithPositionSet() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 5L);
        this.fetcher.resetOffsetsIfNeeded();
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToDefaultOffset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToLatestOffset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.updateMetadata(this.initialUpdateResponse);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testFetchOffsetErrors() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.OFFSET_NOT_AVAILABLE, 1L, 5L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.LEADER_NOT_AVAILABLE, 1L, 5L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals((long)this.subscriptions.position((TopicPartition)this.tp0).offset, (long)5L);
    }

    @Test
    public void testListOffsetSendsReadUncommitted() {
        this.testListOffsetsSendsIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
    }

    @Test
    public void testListOffsetSendsReadCommitted() {
        this.testListOffsetsSendsIsolationLevel(IsolationLevel.READ_COMMITTED);
    }

    private void testListOffsetsSendsIsolationLevel(IsolationLevel isolationLevel) {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(body -> {
            ListOffsetRequest request = (ListOffsetRequest)body;
            return request.isolationLevel() == isolationLevel;
        }, (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testResetOffsetsSkipsBlackedOutConnections() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.client.updateMetadata(this.initialUpdateResponse);
        Node node = (Node)this.initialUpdateResponse.brokers().iterator().next();
        this.client.blackout(node, 500L);
        this.fetcher.resetOffsetsIfNeeded();
        Assert.assertEquals((long)0L, (long)this.consumerClient.pendingRequestCount());
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertEquals((Object)OffsetResetStrategy.EARLIEST, (Object)this.subscriptions.resetStrategy(this.tp0));
        this.time.sleep(500L);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToEarliestOffset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testResetOffsetsMetadataRefresh() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NOT_LEADER_FOR_PARTITION, 1L, 5L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        this.client.prepareMetadataUpdate(this.initialUpdateResponse);
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.client.hasPendingMetadataUpdates());
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionDisconnect() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L), true);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        this.client.prepareMetadataUpdate(this.initialUpdateResponse);
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.client.hasPendingMetadataUpdates());
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testAssignmentChangeWithInFlightReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue((boolean)this.client.hasInFlightRequests());
        this.assignFromUser(Collections.singleton(this.tp1));
        this.client.respond((AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.client.hasPendingResponses());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertFalse((boolean)this.subscriptions.isAssigned(this.tp0));
    }

    @Test
    public void testSeekWithInFlightReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue((boolean)this.client.hasInFlightRequests());
        this.subscriptions.seek(this.tp0, 237L);
        this.client.respond((AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.client.hasPendingResponses());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertEquals((long)237L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testEarlierOffsetResetArrivesLate() throws InterruptedException {
        LogContext lc = new LogContext();
        this.buildFetcher((SubscriptionState)Mockito.spy((Object)new SubscriptionState(lc, OffsetResetStrategy.EARLIEST)), lc);
        this.assignFromUser(Collections.singleton(this.tp0));
        ExecutorService es = Executors.newSingleThreadExecutor();
        CountDownLatch latchLatestStart = new CountDownLatch(1);
        CountDownLatch latchEarliestStart = new CountDownLatch(1);
        CountDownLatch latchEarliestDone = new CountDownLatch(1);
        CountDownLatch latchEarliestFinish = new CountDownLatch(1);
        try {
            ((SubscriptionState)Mockito.doAnswer(invocation -> {
                latchLatestStart.countDown();
                latchEarliestStart.await();
                Object result = invocation.callRealMethod();
                latchEarliestDone.countDown();
                return result;
            }).when((Object)this.subscriptions)).maybeSeekUnvalidated(this.tp0, 0L, OffsetResetStrategy.EARLIEST);
            es.submit(() -> {
                this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
                this.fetcher.resetOffsetsIfNeeded();
                this.consumerClient.pollNoWakeup();
                this.client.respond((AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 0L));
                this.consumerClient.pollNoWakeup();
                latchEarliestFinish.countDown();
            }, Void.class);
            latchLatestStart.await();
            this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
            this.fetcher.resetOffsetsIfNeeded();
            this.consumerClient.pollNoWakeup();
            this.client.respond((AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 10L));
            latchEarliestStart.countDown();
            latchEarliestDone.await();
            this.consumerClient.pollNoWakeup();
            latchEarliestFinish.await();
            Assert.assertEquals((long)10L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        }
        finally {
            es.shutdown();
            es.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        }
    }

    @Test
    public void testChangeResetWithInFlightReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue((boolean)this.client.hasInFlightRequests());
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.client.respond((AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.client.hasPendingResponses());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertEquals((Object)OffsetResetStrategy.EARLIEST, (Object)this.subscriptions.resetStrategy(this.tp0));
    }

    @Test
    public void testIdempotentResetWithInFlightReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue((boolean)this.client.hasInFlightRequests());
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.respond((AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testRestOffsetsAuthorizationFailure() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.TOPIC_AUTHORIZATION_FAILED, -1L, -1L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        try {
            this.fetcher.resetOffsetsIfNeeded();
            Assert.fail((String)"Expected authorization error to be raised");
        }
        catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.tp0.topic()), (Object)e.unauthorizedTopics());
        }
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.pause(this.tp0);
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 10L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertEquals((long)10L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0);
        this.subscriptions.pause(this.tp0);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 10L);
        this.subscriptions.pause(this.tp0);
        this.fetcher.resetOffsetsIfNeeded();
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertEquals((long)10L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testGetAllTopics() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.NONE));
        Map allTopics = this.fetcher.getAllTopicMetadata(this.time.timer(5000L));
        Assert.assertEquals((long)this.initialUpdateResponse.topicMetadata().size(), (long)allTopics.size());
    }

    @Test
    public void testGetAllTopicsDisconnect() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(null, true);
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.NONE));
        Map allTopics = this.fetcher.getAllTopicMetadata(this.time.timer(5000L));
        Assert.assertEquals((long)this.initialUpdateResponse.topicMetadata().size(), (long)allTopics.size());
    }

    @Test(expected=TimeoutException.class)
    public void testGetAllTopicsTimeout() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.fetcher.getAllTopicMetadata(this.time.timer(50L));
    }

    @Test
    public void testGetAllTopicsUnauthorized() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.TOPIC_AUTHORIZATION_FAILED));
        try {
            this.fetcher.getAllTopicMetadata(this.time.timer(10L));
            Assert.fail();
        }
        catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.topicName), (Object)e.unauthorizedTopics());
        }
    }

    @Test(expected=InvalidTopicException.class)
    public void testGetTopicMetadataInvalidTopic() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.INVALID_TOPIC_EXCEPTION));
        this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), this.time.timer(5000L));
    }

    @Test
    public void testGetTopicMetadataUnknownTopic() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION));
        Map topicMetadata = this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), this.time.timer(5000L));
        Assert.assertNull(topicMetadata.get(this.topicName));
    }

    @Test
    public void testGetTopicMetadataLeaderNotAvailable() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.LEADER_NOT_AVAILABLE));
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.NONE));
        Map topicMetadata = this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), this.time.timer(5000L));
        Assert.assertTrue((boolean)topicMetadata.containsKey(this.topicName));
    }

    @Test
    public void testGetTopicMetadataOfflinePartitions() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        MetadataResponse originalResponse = this.newMetadataResponse(this.topicName, Errors.NONE);
        ArrayList<MetadataResponse.TopicMetadata> altTopics = new ArrayList<MetadataResponse.TopicMetadata>();
        for (MetadataResponse.TopicMetadata item : originalResponse.topicMetadata()) {
            List partitions = item.partitionMetadata();
            ArrayList<MetadataResponse.PartitionMetadata> altPartitions = new ArrayList<MetadataResponse.PartitionMetadata>();
            for (MetadataResponse.PartitionMetadata p : partitions) {
                altPartitions.add(new MetadataResponse.PartitionMetadata(p.error(), p.partition(), null, Optional.empty(), p.replicas(), p.isr(), p.offlineReplicas()));
            }
            MetadataResponse.TopicMetadata alteredTopic = new MetadataResponse.TopicMetadata(item.error(), item.topic(), item.isInternal(), altPartitions);
            altTopics.add(alteredTopic);
        }
        Node controller = originalResponse.controller();
        MetadataResponse altered = MetadataResponse.prepareResponse((Collection)originalResponse.brokers(), (String)originalResponse.clusterId(), (int)(controller != null ? controller.id() : -1), altTopics);
        this.client.prepareResponse((AbstractResponse)altered);
        Map topicMetadata = this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), false), this.time.timer(5000L));
        Assert.assertNotNull((Object)topicMetadata);
        Assert.assertNotNull(topicMetadata.get(this.topicName));
        Assert.assertEquals((long)this.metadata.fetch().partitionCountForTopic(this.topicName).longValue(), (long)((List)topicMetadata.get(this.topicName)).size());
    }

    @Test
    public void testQuotaMetrics() {
        this.buildFetcher();
        MockSelector selector = new MockSelector(this.time);
        Sensor throttleTimeSensor = Fetcher.throttleTimeSensor((Metrics)this.metrics, (FetcherMetricsRegistry)this.metricsRegistry);
        Cluster cluster = TestUtils.singletonCluster("test", 1);
        Node node = (Node)cluster.nodes().get(0);
        NetworkClient client = new NetworkClient((Selectable)selector, (Metadata)this.metadata, "mock", Integer.MAX_VALUE, 1000L, 1000L, 65536, 65536, 1000, ClientDnsLookup.DEFAULT, (Time)this.time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
        short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
        ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse((int)400, (byte)2).serialize(apiVersionsResponseVersion, new ResponseHeader(0));
        selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
        while (!client.ready(node, this.time.milliseconds())) {
            client.poll(1L, this.time.milliseconds());
            this.time.sleep(client.throttleDelayMs(node, this.time.milliseconds()));
        }
        selector.clear();
        for (int i = 1; i <= 3; ++i) {
            int throttleTimeMs = 100 * i;
            FetchRequest.Builder builder = FetchRequest.Builder.forConsumer((int)100, (int)100, new LinkedHashMap());
            builder.rackId("");
            ClientRequest request = client.newClientRequest(node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true);
            client.send(request, this.time.milliseconds());
            client.poll(1L, this.time.milliseconds());
            FetchResponse<MemoryRecords> response = this.fullFetchResponse(this.tp0, this.nextRecords, Errors.NONE, i, throttleTimeMs);
            buffer = response.serialize(ApiKeys.FETCH.latestVersion(), new ResponseHeader(request.correlationId()));
            selector.completeReceive(new NetworkReceive(node.idString(), buffer));
            client.poll(1L, this.time.milliseconds());
            this.time.sleep(client.throttleDelayMs(node, this.time.milliseconds()));
            selector.clear();
        }
        Map allMetrics = this.metrics.metrics();
        KafkaMetric avgMetric = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchThrottleTimeAvg, new String[0]));
        KafkaMetric maxMetric = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchThrottleTimeMax, new String[0]));
        Assert.assertEquals((double)250.0, (double)((Double)avgMetric.metricValue()), (double)1.0E-4);
        Assert.assertEquals((double)400.0, (double)((Double)maxMetric.metricValue()), (double)1.0E-4);
        client.close();
    }

    @Test
    public void testFetcherMetrics() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        MetricName maxLagMetric = this.metrics.metricInstance(this.metricsRegistry.recordsLagMax, new String[0]);
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("topic", this.tp0.topic());
        tags.put("partition", String.valueOf(this.tp0.partition()));
        MetricName partitionLagMetric = this.metrics.metricName("records-lag", this.metricGroup, tags);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric recordsFetchLagMax = (KafkaMetric)allMetrics.get(maxLagMetric);
        Assert.assertEquals((double)Double.NaN, (double)((Double)recordsFetchLagMax.metricValue()), (double)1.0E-4);
        this.fetchRecords(this.tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 0);
        Assert.assertEquals((double)100.0, (double)((Double)recordsFetchLagMax.metricValue()), (double)1.0E-4);
        KafkaMetric partitionLag = (KafkaMetric)allMetrics.get(partitionLagMetric);
        Assert.assertEquals((double)100.0, (double)((Double)partitionLag.metricValue()), (double)1.0E-4);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        this.fetchRecords(this.tp0, builder.build(), Errors.NONE, 200L, 0);
        Assert.assertEquals((double)197.0, (double)((Double)recordsFetchLagMax.metricValue()), (double)1.0E-4);
        Assert.assertEquals((double)197.0, (double)((Double)partitionLag.metricValue()), (double)1.0E-4);
        this.subscriptions.unsubscribe();
        this.fetcher.sendFetches();
        Assert.assertFalse((boolean)allMetrics.containsKey(partitionLagMetric));
    }

    @Test
    public void testFetcherLeadMetric() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        MetricName minLeadMetric = this.metrics.metricInstance(this.metricsRegistry.recordsLeadMin, new String[0]);
        HashMap<String, String> tags = new HashMap<String, String>(2);
        tags.put("topic", this.tp0.topic());
        tags.put("partition", String.valueOf(this.tp0.partition()));
        MetricName partitionLeadMetric = this.metrics.metricName("records-lead", this.metricGroup, "", tags);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric recordsFetchLeadMin = (KafkaMetric)allMetrics.get(minLeadMetric);
        Assert.assertEquals((double)Double.NaN, (double)((Double)recordsFetchLeadMin.metricValue()), (double)1.0E-4);
        this.fetchRecords(this.tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, -1L, 0L, 0);
        Assert.assertEquals((double)0.0, (double)((Double)recordsFetchLeadMin.metricValue()), (double)1.0E-4);
        KafkaMetric partitionLead = (KafkaMetric)allMetrics.get(partitionLeadMetric);
        Assert.assertEquals((double)0.0, (double)((Double)partitionLead.metricValue()), (double)1.0E-4);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        this.fetchRecords(this.tp0, builder.build(), Errors.NONE, 200L, -1L, 0L, 0);
        Assert.assertEquals((double)0.0, (double)((Double)recordsFetchLeadMin.metricValue()), (double)1.0E-4);
        Assert.assertEquals((double)3.0, (double)((Double)partitionLead.metricValue()), (double)1.0E-4);
        this.subscriptions.unsubscribe();
        this.fetcher.sendFetches();
        Assert.assertFalse((boolean)allMetrics.containsKey(partitionLeadMetric));
    }

    @Test
    public void testReadCommittedLagMetric() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        MetricName maxLagMetric = this.metrics.metricInstance(this.metricsRegistry.recordsLagMax, new String[0]);
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("topic", this.tp0.topic());
        tags.put("partition", String.valueOf(this.tp0.partition()));
        MetricName partitionLagMetric = this.metrics.metricName("records-lag", this.metricGroup, tags);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric recordsFetchLagMax = (KafkaMetric)allMetrics.get(maxLagMetric);
        Assert.assertEquals((double)Double.NaN, (double)((Double)recordsFetchLagMax.metricValue()), (double)1.0E-4);
        this.fetchRecords(this.tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0);
        Assert.assertEquals((double)50.0, (double)((Double)recordsFetchLagMax.metricValue()), (double)1.0E-4);
        KafkaMetric partitionLag = (KafkaMetric)allMetrics.get(partitionLagMetric);
        Assert.assertEquals((double)50.0, (double)((Double)partitionLag.metricValue()), (double)1.0E-4);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        this.fetchRecords(this.tp0, builder.build(), Errors.NONE, 200L, 150L, 0);
        Assert.assertEquals((double)147.0, (double)((Double)recordsFetchLagMax.metricValue()), (double)1.0E-4);
        Assert.assertEquals((double)147.0, (double)((Double)partitionLag.metricValue()), (double)1.0E-4);
        this.subscriptions.unsubscribe();
        this.fetcher.sendFetches();
        Assert.assertFalse((boolean)allMetrics.containsKey(partitionLagMetric));
    }

    @Test
    public void testFetchResponseMetrics() {
        this.buildFetcher();
        String topic1 = "foo";
        String topic2 = "bar";
        TopicPartition tp1 = new TopicPartition(topic1, 0);
        TopicPartition tp2 = new TopicPartition(topic2, 0);
        this.subscriptions.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{tp1, tp2}));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(topic1, 1);
        partitionCounts.put(topic2, 1);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, partitionCounts));
        int expectedBytes = 0;
        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> fetchPartitionData = new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>();
        for (TopicPartition tp : Utils.mkSet((Object[])new TopicPartition[]{tp1, tp2})) {
            this.subscriptions.seek(tp, 0L);
            MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
            for (int v = 0; v < 3; ++v) {
                builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
            }
            MemoryRecords records = builder.build();
            for (Record record : records.records()) {
                expectedBytes += record.sizeInBytes();
            }
            fetchPartitionData.put(tp, new FetchResponse.PartitionData(Errors.NONE, 15L, -1L, 0L, null, (BaseRecords)records));
        }
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)new FetchResponse(Errors.NONE, fetchPartitionData, 0, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords = this.fetchedRecords();
        Assert.assertEquals((long)3L, (long)fetchedRecords.get(tp1).size());
        Assert.assertEquals((long)3L, (long)fetchedRecords.get(tp2).size());
        Map allMetrics = this.metrics.metrics();
        KafkaMetric fetchSizeAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric recordsCountAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        Assert.assertEquals((double)expectedBytes, (double)((Double)fetchSizeAverage.metricValue()), (double)1.0E-4);
        Assert.assertEquals((double)6.0, (double)((Double)recordsCountAverage.metricValue()), (double)1.0E-4);
    }

    @Test
    public void testFetchResponseMetricsPartialResponse() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric fetchSizeAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric recordsCountAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        MemoryRecords records = builder.build();
        int expectedBytes = 0;
        for (Record record : records.records()) {
            if (record.offset() < 1L) continue;
            expectedBytes += record.sizeInBytes();
        }
        this.fetchRecords(this.tp0, records, Errors.NONE, 100L, 0);
        Assert.assertEquals((double)expectedBytes, (double)((Double)fetchSizeAverage.metricValue()), (double)1.0E-4);
        Assert.assertEquals((double)2.0, (double)((Double)recordsCountAverage.metricValue()), (double)1.0E-4);
    }

    @Test
    public void testFetchResponseMetricsWithOnePartitionError() {
        this.buildFetcher();
        this.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 0L);
        this.subscriptions.seek(this.tp1, 0L);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric fetchSizeAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric recordsCountAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        MemoryRecords records = builder.build();
        HashMap<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<TopicPartition, FetchResponse.PartitionData>();
        partitions.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, null, (BaseRecords)records));
        partitions.put(this.tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, 0L, null, (BaseRecords)MemoryRecords.EMPTY));
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)new FetchResponse(Errors.NONE, new LinkedHashMap(partitions), 0, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.fetcher.fetchedRecords();
        int expectedBytes = 0;
        for (Record record : records.records()) {
            expectedBytes += record.sizeInBytes();
        }
        Assert.assertEquals((double)expectedBytes, (double)((Double)fetchSizeAverage.metricValue()), (double)1.0E-4);
        Assert.assertEquals((double)3.0, (double)((Double)recordsCountAverage.metricValue()), (double)1.0E-4);
    }

    @Test
    public void testFetchResponseMetricsWithOnePartitionAtTheWrongOffset() {
        this.buildFetcher();
        this.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 0L);
        this.subscriptions.seek(this.tp1, 0L);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric fetchSizeAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric recordsCountAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.subscriptions.seek(this.tp1, 5L);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        MemoryRecords records = builder.build();
        HashMap<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<TopicPartition, FetchResponse.PartitionData>();
        partitions.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, null, (BaseRecords)records));
        partitions.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, null, (BaseRecords)MemoryRecords.withRecords((CompressionType)CompressionType.NONE, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord("val".getBytes())})));
        this.client.prepareResponse((AbstractResponse)new FetchResponse(Errors.NONE, new LinkedHashMap(partitions), 0, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.fetcher.fetchedRecords();
        int expectedBytes = 0;
        for (Record record : records.records()) {
            expectedBytes += record.sizeInBytes();
        }
        Assert.assertEquals((double)expectedBytes, (double)((Double)fetchSizeAverage.metricValue()), (double)1.0E-4);
        Assert.assertEquals((double)3.0, (double)((Double)recordsCountAverage.metricValue()), (double)1.0E-4);
    }

    @Test
    public void testFetcherMetricsTemplates() {
        Map<String, String> clientTags = Collections.singletonMap("client-id", "clientA");
        this.buildFetcher(new MetricConfig().tags(clientTags), OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Fetcher.throttleTimeSensor((Metrics)this.metrics, (FetcherMetricsRegistry)this.metricsRegistry);
        HashSet<MetricNameTemplate> allMetrics = new HashSet<MetricNameTemplate>();
        for (MetricName n : this.metrics.metrics().keySet()) {
            String name = n.name().replaceAll(this.tp0.toString(), "{topic}-{partition}");
            if (n.group().equals("kafka-metrics-count")) continue;
            allMetrics.add(new MetricNameTemplate(name, n.group(), "", n.tags().keySet()));
        }
        TestUtils.checkEquals(allMetrics, new HashSet(this.metricsRegistry.getAllTemplates()), "metrics", "templates");
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
        return this.fetchRecords(tp, records, error, hw, -1L, throttleTime);
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(TopicPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, int throttleTime) {
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(tp, records, error, hw, lastStableOffset, throttleTime));
        this.consumerClient.poll(this.time.timer(0L));
        return this.fetchedRecords();
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(TopicPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, long logStartOffset, int throttleTime) {
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(tp, records, error, hw, lastStableOffset, logStartOffset, throttleTime));
        this.consumerClient.poll(this.time.timer(0L));
        return this.fetchedRecords();
    }

    @Test
    public void testGetOffsetsForTimesTimeout() {
        try {
            this.buildFetcher();
            this.fetcher.offsetsForTimes(Collections.singletonMap(new TopicPartition(this.topicName, 2), 1000L), this.time.timer(100L));
            Assert.fail((String)"Should throw timeout exception.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }

    @Test
    public void testGetOffsetsForTimes() {
        this.buildFetcher();
        Assert.assertTrue((boolean)this.fetcher.offsetsForTimes(new HashMap(), this.time.timer(100L)).isEmpty());
        this.testGetOffsetsForTimesWithUnknownOffset();
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, 100L, null, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.INVALID_REQUEST, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_FOR_PARTITION, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Errors.NONE, 10L, 100L, null, 100L);
        this.testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L);
    }

    @Test
    public void testGetOffsetsFencedLeaderEpoch() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(this.initialUpdateResponse);
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(Errors.FENCED_LEADER_EPOCH, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testGetOffsetsUnknownLeaderEpoch() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(Errors.UNKNOWN_LEADER_EPOCH, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assert.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testGetOffsetsIncludesLeaderEpoch() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(this.initialUpdateResponse);
        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap(this.topicName, 4), tp -> 99);
        this.client.updateMetadata(metadataResponse);
        this.subscriptions.requestOffsetReset(this.tp0);
        this.fetcher.resetOffsetsIfNeeded();
        MockClient.RequestMatcher matcher = body -> {
            if (body instanceof ListOffsetRequest) {
                ListOffsetRequest offsetRequest = (ListOffsetRequest)body;
                Optional epoch = ((ListOffsetRequest.PartitionData)offsetRequest.partitionTimestamps().get((Object)this.tp0)).currentLeaderEpoch;
                Assert.assertTrue((String)"Expected Fetcher to set leader epoch in request", (boolean)epoch.isPresent());
                Assert.assertEquals((String)"Expected leader epoch to match epoch from metadata update", (long)((Integer)epoch.get()).longValue(), (long)99L);
                return true;
            }
            Assert.fail((String)"Should have seen ListOffsetRequest");
            return false;
        };
        this.client.prepareResponse(matcher, (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
    }

    @Test
    public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersNotKnownInitially() {
        this.buildFetcher();
        String anotherTopic = "another-topic";
        TopicPartition t2p0 = new TopicPartition("another-topic", 0);
        this.client.reset();
        MetadataResponse initialMetadata = TestUtils.metadataUpdateWith(3, Collections.singletonMap(this.topicName, 2));
        this.client.updateMetadata(initialMetadata);
        this.client.prepareMetadataUpdate(initialMetadata);
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, 1000L, 11L), this.metadata.fetch().leaderFor(this.tp0));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(this.tp1, Errors.NONE, 1000L, 32L), this.metadata.fetch().leaderFor(this.tp1));
        HashMap<String, Integer> partitionNumByTopic = new HashMap<String, Integer>();
        partitionNumByTopic.put(this.topicName, 2);
        partitionNumByTopic.put("another-topic", 1);
        MetadataResponse updatedMetadata = TestUtils.metadataUpdateWith(3, partitionNumByTopic);
        this.client.prepareMetadataUpdate(updatedMetadata);
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(t2p0, Errors.NONE, 1000L, 54L), this.metadata.fetch().leaderFor(t2p0));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(this.tp0, -1L);
        timestampToSearch.put(this.tp1, -1L);
        timestampToSearch.put(t2p0, -1L);
        Map offsetAndTimestampMap = this.fetcher.offsetsForTimes(timestampToSearch, this.time.timer(Long.MAX_VALUE));
        Assert.assertNotNull((String)("Expect Fetcher.offsetsForTimes() to return non-null result for " + this.tp0), offsetAndTimestampMap.get(this.tp0));
        Assert.assertNotNull((String)("Expect Fetcher.offsetsForTimes() to return non-null result for " + this.tp1), offsetAndTimestampMap.get(this.tp1));
        Assert.assertNotNull((String)("Expect Fetcher.offsetsForTimes() to return non-null result for " + t2p0), offsetAndTimestampMap.get(t2p0));
        Assert.assertEquals((long)11L, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(this.tp0)).offset());
        Assert.assertEquals((long)32L, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(this.tp1)).offset());
        Assert.assertEquals((long)54L, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(t2p0)).offset());
    }

    @Test(expected=TimeoutException.class)
    public void testBatchedListOffsetsMetadataErrors() {
        this.buildFetcher();
        HashMap<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
        partitionData.put(this.tp0, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION, -1L, -1L, Optional.empty()));
        partitionData.put(this.tp1, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, -1L, -1L, Optional.empty()));
        this.client.prepareResponse((AbstractResponse)new ListOffsetResponse(0, partitionData));
        HashMap<TopicPartition, Long> offsetsToSearch = new HashMap<TopicPartition, Long>();
        offsetsToSearch.put(this.tp0, -2L);
        offsetsToSearch.put(this.tp1, -2L);
        this.fetcher.offsetsForTimes(offsetsToSearch, this.time.timer(0L));
    }

    @Test
    public void testSkippingAbortedTransactions() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int currentOffset = 0;
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        abortedTransactions.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assert.assertFalse((boolean)fetchedRecords.containsKey(this.tp0));
    }

    @Test
    public void testReturnCommittedTransactions() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int currentOffset = 0;
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()));
        currentOffset += this.commitTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                FetchRequest request = (FetchRequest)body;
                Assert.assertEquals((Object)IsolationLevel.READ_COMMITTED, (Object)request.isolationLevel());
                return true;
            }
        }, (AbstractResponse)this.fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)fetchedRecords.containsKey(this.tp0));
        Assert.assertEquals((long)fetchedRecords.get(this.tp0).size(), (long)2L);
    }

    @Test
    public void testReadCommittedWithCommittedAndAbortedTransactions() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        long pid1 = 1L;
        long pid2 = 2L;
        this.appendTransactionalRecords(buffer, pid1, 0L, new SimpleRecord("commit1-1".getBytes(), "value".getBytes()), new SimpleRecord("commit1-2".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid2, 2L, new SimpleRecord("abort2-1".getBytes(), "value".getBytes()));
        this.commitTransaction(buffer, pid1, 3L);
        this.appendTransactionalRecords(buffer, pid2, 4L, new SimpleRecord("abort2-2".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, pid2, 5L);
        abortedTransactions.add(new FetchResponse.AbortedTransaction(pid2, 2L));
        this.appendTransactionalRecords(buffer, pid1, 6L, new SimpleRecord("abort1-1".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid2, 7L, new SimpleRecord("commit2-1".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid1, 8L, new SimpleRecord("abort1-2".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, pid1, 9L);
        abortedTransactions.add(new FetchResponse.AbortedTransaction(1L, 6L));
        this.commitTransaction(buffer, pid2, 10L);
        buffer.flip();
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)fetchedRecords.containsKey(this.tp0));
        List fetchedConsumerRecords = fetchedRecords.get(this.tp0);
        HashSet<String> fetchedKeys = new HashSet<String>();
        for (ConsumerRecord consumerRecord : fetchedConsumerRecords) {
            fetchedKeys.add(new String((byte[])consumerRecord.key(), StandardCharsets.UTF_8));
        }
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"commit1-1", "commit1-2", "commit2-1"}), fetchedKeys);
    }

    @Test
    public void testMultipleAbortMarkers() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int currentOffset = 0;
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
        currentOffset += this.abortTransaction(buffer, 1L, currentOffset);
        currentOffset += this.abortTransaction(buffer, 1L, currentOffset);
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "commit1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "commit1-2".getBytes(), "value".getBytes()));
        this.commitTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        abortedTransactions.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)fetchedRecords.containsKey(this.tp0));
        Assert.assertEquals((long)fetchedRecords.get(this.tp0).size(), (long)2L);
        List fetchedConsumerRecords = fetchedRecords.get(this.tp0);
        HashSet<String> committedKeys = new HashSet<String>(Arrays.asList("commit1-1", "commit1-2"));
        HashSet<String> actuallyCommittedKeys = new HashSet<String>();
        for (ConsumerRecord consumerRecord : fetchedConsumerRecords) {
            actuallyCommittedKeys.add(new String((byte[])consumerRecord.key(), StandardCharsets.UTF_8));
        }
        Assert.assertTrue((boolean)actuallyCommittedKeys.equals(committedKeys));
    }

    @Test
    public void testReadCommittedAbortMarkerWithNoData() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        long producerId = 1L;
        this.abortTransaction(buffer, producerId, 5L);
        this.appendTransactionalRecords(buffer, producerId, 6L, new SimpleRecord("6".getBytes(), null), new SimpleRecord("7".getBytes(), null), new SimpleRecord("8".getBytes(), null));
        this.commitTransaction(buffer, producerId, 9L);
        buffer.flip();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        abortedTransactions.add(new FetchResponse.AbortedTransaction(producerId, 0L));
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponseWithAbortedTransactions(MemoryRecords.readableRecords((ByteBuffer)buffer), abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map allFetchedRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)allFetchedRecords.containsKey(this.tp0));
        List fetchedRecords = allFetchedRecords.get(this.tp0);
        Assert.assertEquals((long)3L, (long)fetchedRecords.size());
        Assert.assertEquals(Arrays.asList(6L, 7L, 8L), this.collectRecordOffsets(fetchedRecords));
    }

    @Test
    public void testUpdatePositionWithLastRecordMissingFromBatch() {
        this.buildFetcher();
        MemoryRecords records = MemoryRecords.withRecords((CompressionType)CompressionType.NONE, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord("0".getBytes(), "v".getBytes()), new SimpleRecord("1".getBytes(), "v".getBytes()), new SimpleRecord("2".getBytes(), "v".getBytes()), new SimpleRecord(null, "value".getBytes())});
        MemoryRecords.FilterResult result = records.filterTo(this.tp0, new MemoryRecords.RecordFilter(){

            protected MemoryRecords.RecordFilter.BatchRetention checkBatchRetention(RecordBatch batch) {
                return MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY;
            }

            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
                return record.key() != null;
            }
        }, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
        result.outputBuffer().flip();
        MemoryRecords compactedRecords = MemoryRecords.readableRecords((ByteBuffer)result.outputBuffer());
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, compactedRecords, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map allFetchedRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)allFetchedRecords.containsKey(this.tp0));
        List fetchedRecords = allFetchedRecords.get(this.tp0);
        Assert.assertEquals((long)3L, (long)fetchedRecords.size());
        for (int i = 0; i < 3; ++i) {
            Assert.assertEquals((Object)Integer.toString(i), (Object)new String((byte[])fetchedRecords.get(i).key()));
        }
        Assert.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdatePositionOnEmptyBatch() {
        this.buildFetcher();
        long producerId = 1L;
        short producerEpoch = 0;
        int sequence = 1;
        long baseOffset = 37L;
        long lastOffset = 54L;
        int partitionLeaderEpoch = 7;
        ByteBuffer buffer = ByteBuffer.allocate(61);
        DefaultRecordBatch.writeEmptyHeader((ByteBuffer)buffer, (byte)2, (long)producerId, (short)producerEpoch, (int)sequence, (long)baseOffset, (long)lastOffset, (int)partitionLeaderEpoch, (TimestampType)TimestampType.CREATE_TIME, (long)System.currentTimeMillis(), (boolean)false, (boolean)false);
        buffer.flip();
        MemoryRecords recordsWithEmptyBatch = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, recordsWithEmptyBatch, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map allFetchedRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)allFetchedRecords.isEmpty());
        Assert.assertEquals((long)(lastOffset + 1L), (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testReadCommittedWithCompactedTopic() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        long pid1 = 1L;
        long pid2 = 2L;
        long pid3 = 3L;
        this.appendTransactionalRecords(buffer, pid3, 3L, new SimpleRecord("3".getBytes(), "value".getBytes()), new SimpleRecord("4".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid2, 15L, new SimpleRecord("15".getBytes(), "value".getBytes()), new SimpleRecord("16".getBytes(), "value".getBytes()), new SimpleRecord("17".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid1, 22L, new SimpleRecord("22".getBytes(), "value".getBytes()), new SimpleRecord("23".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, pid2, 28L);
        this.appendTransactionalRecords(buffer, pid3, 30L, new SimpleRecord("30".getBytes(), "value".getBytes()), new SimpleRecord("31".getBytes(), "value".getBytes()), new SimpleRecord("32".getBytes(), "value".getBytes()));
        this.commitTransaction(buffer, pid3, 35L);
        this.appendTransactionalRecords(buffer, pid1, 39L, new SimpleRecord("39".getBytes(), "value".getBytes()), new SimpleRecord("40".getBytes(), "value".getBytes()));
        buffer.flip();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        abortedTransactions.add(new FetchResponse.AbortedTransaction(pid2, 6L));
        abortedTransactions.add(new FetchResponse.AbortedTransaction(pid1, 0L));
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponseWithAbortedTransactions(MemoryRecords.readableRecords((ByteBuffer)buffer), abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map allFetchedRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)allFetchedRecords.containsKey(this.tp0));
        List fetchedRecords = allFetchedRecords.get(this.tp0);
        Assert.assertEquals((long)5L, (long)fetchedRecords.size());
        Assert.assertEquals(Arrays.asList(3L, 4L, 30L, 31L, 32L), this.collectRecordOffsets(fetchedRecords));
    }

    @Test
    public void testReturnAbortedTransactionsinUncommittedMode() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int currentOffset = 0;
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        abortedTransactions.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)fetchedRecords.containsKey(this.tp0));
    }

    @Test
    public void testConsumerPositionUpdatedWhenSkippingAbortedTransactions() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        long currentOffset = 0L;
        currentOffset += (long)this.appendTransactionalRecords(buffer, 1L, currentOffset, new SimpleRecord(this.time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
        currentOffset += (long)this.abortTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        abortedTransactions.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assert.assertFalse((boolean)fetchedRecords.containsKey(this.tp0));
        Assert.assertEquals((long)currentOffset, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testConsumingViaIncrementalFetchRequests() {
        this.buildFetcher(2);
        this.assignFromUser(new HashSet<TopicPartition>(Arrays.asList(this.tp0, this.tp1)));
        this.subscriptions.seekValidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(), this.metadata.leaderAndEpoch(this.tp0)));
        this.subscriptions.seekValidated(this.tp1, new SubscriptionState.FetchPosition(1L, Optional.empty(), this.metadata.leaderAndEpoch(this.tp1)));
        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> partitions1 = new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>();
        partitions1.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 2L, 2L, 0L, null, (BaseRecords)this.records));
        partitions1.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, null, (BaseRecords)this.emptyRecords));
        FetchResponse resp1 = new FetchResponse(Errors.NONE, partitions1, 0, 123);
        this.client.prepareResponse((AbstractResponse)resp1);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assert.assertFalse((boolean)fetchedRecords.containsKey(this.tp1));
        List records = fetchedRecords.get(this.tp0);
        Assert.assertEquals((long)2L, (long)records.size());
        Assert.assertEquals((long)3L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assert.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp1).offset);
        Assert.assertEquals((long)1L, (long)records.get(0).offset());
        Assert.assertEquals((long)2L, (long)records.get(1).offset());
        Assert.assertEquals((long)0L, (long)this.fetcher.sendFetches());
        fetchedRecords = this.fetchedRecords();
        Assert.assertFalse((boolean)fetchedRecords.containsKey(this.tp1));
        records = fetchedRecords.get(this.tp0);
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals((long)3L, (long)records.get(0).offset());
        Assert.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        LinkedHashMap partitions2 = new LinkedHashMap();
        FetchResponse resp2 = new FetchResponse(Errors.NONE, partitions2, 0, 123);
        this.client.prepareResponse((AbstractResponse)resp2);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        fetchedRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)fetchedRecords.isEmpty());
        Assert.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assert.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp1).offset);
        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> partitions3 = new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>();
        partitions3.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 100L, 4L, 0L, null, (BaseRecords)this.nextRecords));
        FetchResponse resp3 = new FetchResponse(Errors.NONE, partitions3, 0, 123);
        this.client.prepareResponse((AbstractResponse)resp3);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        fetchedRecords = this.fetchedRecords();
        Assert.assertFalse((boolean)fetchedRecords.containsKey(this.tp1));
        records = fetchedRecords.get(this.tp0);
        Assert.assertEquals((long)2L, (long)records.size());
        Assert.assertEquals((long)6L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assert.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp1).offset);
        Assert.assertEquals((long)4L, (long)records.get(0).offset());
        Assert.assertEquals((long)5L, (long)records.get(1).offset());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFetcherConcurrency() throws Exception {
        int numPartitions = 20;
        HashSet<TopicPartition> topicPartitions = new HashSet<TopicPartition>();
        for (int i = 0; i < numPartitions; ++i) {
            topicPartitions.add(new TopicPartition(this.topicName, i));
        }
        LogContext logContext = new LogContext();
        this.buildDependencies(new MetricConfig(), Long.MAX_VALUE, new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST), logContext);
        this.fetcher = new Fetcher<byte[], byte[]>(new LogContext(), this.consumerClient, this.minBytes, this.maxBytes, this.maxWaitMs, this.fetchSize, 2 * numPartitions, true, "", (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), this.metadata, this.subscriptions, this.metrics, this.metricsRegistry, (Time)this.time, this.retryBackoffMs, this.requestTimeoutMs, IsolationLevel.READ_UNCOMMITTED, this.apiVersions){

            protected FetchSessionHandler sessionHandler(int id) {
                final FetchSessionHandler handler = super.sessionHandler(id);
                if (handler == null) {
                    return null;
                }
                return new FetchSessionHandler(new LogContext(), id){

                    public FetchSessionHandler.Builder newBuilder() {
                        this.verifySessionPartitions();
                        return handler.newBuilder();
                    }

                    public boolean handleResponse(FetchResponse response) {
                        this.verifySessionPartitions();
                        return handler.handleResponse(response);
                    }

                    public void handleError(Throwable t) {
                        this.verifySessionPartitions();
                        handler.handleError(t);
                    }

                    private void verifySessionPartitions() {
                        try {
                            Field field = FetchSessionHandler.class.getDeclaredField("sessionPartitions");
                            field.setAccessible(true);
                            LinkedHashMap sessionPartitions = (LinkedHashMap)field.get(handler);
                            for (Map.Entry entry : sessionPartitions.entrySet()) {
                                Thread.yield();
                            }
                        }
                        catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                };
            }
        };
        MetadataResponse initialMetadataResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap(this.topicName, numPartitions));
        this.client.updateMetadata(initialMetadataResponse);
        this.fetchSize = 10000;
        this.assignFromUser(topicPartitions);
        topicPartitions.forEach(tp -> this.subscriptions.seek(tp, 0L));
        AtomicInteger fetchesRemaining = new AtomicInteger(1000);
        this.executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = this.executorService.submit(() -> {
            while (fetchesRemaining.get() > 0) {
                ConsumerNetworkClient consumerNetworkClient = this.consumerClient;
                synchronized (consumerNetworkClient) {
                    if (!this.client.requests().isEmpty()) {
                        ClientRequest request = this.client.requests().peek();
                        FetchRequest fetchRequest = (FetchRequest)request.requestBuilder().build();
                        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseMap = new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>();
                        for (Map.Entry entry : fetchRequest.fetchData().entrySet()) {
                            TopicPartition tp = (TopicPartition)entry.getKey();
                            long offset = ((FetchRequest.PartitionData)entry.getValue()).fetchOffset;
                            responseMap.put(tp, new FetchResponse.PartitionData(Errors.NONE, offset + 2L, offset + 2L, 0L, null, (BaseRecords)this.buildRecords(offset, 2, offset)));
                        }
                        this.client.respondToRequest(request, (AbstractResponse)new FetchResponse(Errors.NONE, responseMap, 0, 123));
                        this.consumerClient.poll(this.time.timer(0L));
                    }
                }
            }
            return fetchesRemaining.get();
        });
        Map nextFetchOffsets = topicPartitions.stream().collect(Collectors.toMap(Function.identity(), t -> 0L));
        while (fetchesRemaining.get() > 0 && !future.isDone()) {
            Map fetchedRecords;
            if (this.fetcher.sendFetches() == 1) {
                ConsumerNetworkClient consumerNetworkClient = this.consumerClient;
                synchronized (consumerNetworkClient) {
                    this.consumerClient.poll(this.time.timer(0L));
                }
            }
            if (!this.fetcher.hasCompletedFetches() || (fetchedRecords = this.fetchedRecords()).isEmpty()) continue;
            fetchesRemaining.decrementAndGet();
            fetchedRecords.entrySet().forEach(entry -> {
                TopicPartition tp = (TopicPartition)entry.getKey();
                List records = (List)entry.getValue();
                Assert.assertEquals((long)2L, (long)records.size());
                long nextOffset = (Long)nextFetchOffsets.get(tp);
                Assert.assertEquals((long)nextOffset, (long)((ConsumerRecord)records.get(0)).offset());
                Assert.assertEquals((long)(nextOffset + 1L), (long)((ConsumerRecord)records.get(1)).offset());
                nextFetchOffsets.put(tp, nextOffset + 2L);
            });
        }
        Assert.assertEquals((Object)0, (Object)future.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFetcherSessionEpochUpdate() throws Exception {
        this.buildFetcher(2);
        MetadataResponse initialMetadataResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap(this.topicName, 1));
        this.client.updateMetadata(initialMetadataResponse);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        AtomicInteger fetchesRemaining = new AtomicInteger(1000);
        this.executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = this.executorService.submit(() -> {
            long nextOffset = 0L;
            long nextEpoch = 0L;
            while (fetchesRemaining.get() > 0) {
                ConsumerNetworkClient consumerNetworkClient = this.consumerClient;
                synchronized (consumerNetworkClient) {
                    if (!this.client.requests().isEmpty()) {
                        ClientRequest request = this.client.requests().peek();
                        FetchRequest fetchRequest = (FetchRequest)request.requestBuilder().build();
                        int epoch = fetchRequest.metadata().epoch();
                        Assert.assertTrue((String)String.format("Unexpected epoch expected %d got %d", nextEpoch, epoch), (epoch == 0 || (long)epoch == nextEpoch ? 1 : 0) != 0);
                        ++nextEpoch;
                        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseMap = new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>();
                        responseMap.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, nextOffset + 2L, nextOffset + 2L, 0L, null, (BaseRecords)this.buildRecords(nextOffset, 2, nextOffset)));
                        nextOffset += 2L;
                        this.client.respondToRequest(request, (AbstractResponse)new FetchResponse(Errors.NONE, responseMap, 0, 123));
                        this.consumerClient.poll(this.time.timer(0L));
                    }
                }
            }
            return fetchesRemaining.get();
        });
        long nextFetchOffset = 0L;
        while (fetchesRemaining.get() > 0 && !future.isDone()) {
            if (this.fetcher.sendFetches() == 1) {
                ConsumerNetworkClient consumerNetworkClient = this.consumerClient;
                synchronized (consumerNetworkClient) {
                    this.consumerClient.poll(this.time.timer(0L));
                }
            }
            if (!this.fetcher.hasCompletedFetches()) continue;
            Map fetchedRecords = this.fetchedRecords();
            if (!fetchedRecords.isEmpty()) {
                fetchesRemaining.decrementAndGet();
                List records = fetchedRecords.get(this.tp0);
                Assert.assertEquals((long)2L, (long)records.size());
                Assert.assertEquals((long)nextFetchOffset, (long)records.get(0).offset());
                Assert.assertEquals((long)(nextFetchOffset + 1L), (long)records.get(1).offset());
                nextFetchOffset += 2L;
            }
            Assert.assertTrue((boolean)this.fetchedRecords().isEmpty());
        }
        Assert.assertEquals((Object)0, (Object)future.get());
    }

    @Test
    public void testEmptyControlBatch() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int currentOffset = 1;
        DefaultRecordBatch.writeEmptyHeader((ByteBuffer)buffer, (byte)2, (long)1L, (short)0, (int)-1, (long)0L, (long)0L, (int)-1, (TimestampType)TimestampType.CREATE_TIME, (long)this.time.milliseconds(), (boolean)true, (boolean)true);
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()));
        this.commitTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                FetchRequest request = (FetchRequest)body;
                Assert.assertEquals((Object)IsolationLevel.READ_COMMITTED, (Object)request.isolationLevel());
                return true;
            }
        }, (AbstractResponse)this.fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)fetchedRecords.containsKey(this.tp0));
        Assert.assertEquals((long)fetchedRecords.get(this.tp0).size(), (long)2L);
    }

    private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)baseOffset);
        for (int i = 0; i < count; ++i) {
            builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + (long)i)).getBytes());
        }
        return builder.build();
    }

    private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset, int baseSequence, SimpleRecord ... records) {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)baseOffset, (long)this.time.milliseconds(), (long)pid, (short)0, (int)baseSequence, (boolean)true, (int)-1);
        for (SimpleRecord record : records) {
            builder.append(record);
        }
        builder.build();
        return records.length;
    }

    private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset, SimpleRecord ... records) {
        return this.appendTransactionalRecords(buffer, pid, baseOffset, (int)baseOffset, records);
    }

    private int commitTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
        short producerEpoch = 0;
        int partitionLeaderEpoch = 0;
        MemoryRecords.writeEndTransactionalMarker((ByteBuffer)buffer, (long)baseOffset, (long)this.time.milliseconds(), (int)partitionLeaderEpoch, (long)producerId, (short)producerEpoch, (EndTransactionMarker)new EndTransactionMarker(ControlRecordType.COMMIT, 0));
        return 1;
    }

    private int abortTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
        short producerEpoch = 0;
        int partitionLeaderEpoch = 0;
        MemoryRecords.writeEndTransactionalMarker((ByteBuffer)buffer, (long)baseOffset, (long)this.time.milliseconds(), (int)partitionLeaderEpoch, (long)producerId, (short)producerEpoch, (EndTransactionMarker)new EndTransactionMarker(ControlRecordType.ABORT, 0));
        return 1;
    }

    private void testGetOffsetsForTimesWithError(Errors errorForP0, Errors errorForP1, long offsetForP0, long offsetForP1, Long expectedOffsetForP0, Long expectedOffsetForP1) {
        this.client.reset();
        String topicName2 = "topic2";
        TopicPartition t2p0 = new TopicPartition(topicName2, 0);
        this.metadata.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"), (ClientDnsLookup)ClientDnsLookup.DEFAULT), this.time.milliseconds());
        HashMap<String, Integer> partitionNumByTopic = new HashMap<String, Integer>();
        partitionNumByTopic.put(this.topicName, 2);
        partitionNumByTopic.put(topicName2, 1);
        MetadataResponse updateMetadataResponse = TestUtils.metadataUpdateWith(2, partitionNumByTopic);
        Cluster updatedCluster = updateMetadataResponse.cluster();
        this.client.prepareMetadataUpdate(updateMetadataResponse, true);
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(t2p0, errorForP0, offsetForP0, offsetForP0), updatedCluster.leaderFor(t2p0));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(this.tp1, errorForP1, offsetForP1, offsetForP1), updatedCluster.leaderFor(this.tp1));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(t2p0, Errors.NONE, offsetForP0, offsetForP0), updatedCluster.leaderFor(t2p0));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(this.tp1, Errors.NONE, offsetForP1, offsetForP1), updatedCluster.leaderFor(this.tp1));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(t2p0, 0L);
        timestampToSearch.put(this.tp1, 0L);
        Map offsetAndTimestampMap = this.fetcher.offsetsForTimes(timestampToSearch, this.time.timer(Long.MAX_VALUE));
        if (expectedOffsetForP0 == null) {
            Assert.assertNull(offsetAndTimestampMap.get(t2p0));
        } else {
            Assert.assertEquals((long)expectedOffsetForP0, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(t2p0)).timestamp());
            Assert.assertEquals((long)expectedOffsetForP0, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(t2p0)).offset());
        }
        if (expectedOffsetForP1 == null) {
            Assert.assertNull(offsetAndTimestampMap.get(this.tp1));
        } else {
            Assert.assertEquals((long)expectedOffsetForP1, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(this.tp1)).timestamp());
            Assert.assertEquals((long)expectedOffsetForP1, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(this.tp1)).offset());
        }
    }

    private void testGetOffsetsForTimesWithUnknownOffset() {
        this.client.reset();
        MetadataResponse initialMetadataUpdate = TestUtils.metadataUpdateWith(1, Collections.singletonMap(this.topicName, 1));
        this.client.updateMetadata(initialMetadataUpdate);
        HashMap<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
        partitionData.put(this.tp0, new ListOffsetResponse.PartitionData(Errors.NONE, -1L, -1L, Optional.empty()));
        this.client.prepareResponseFrom((AbstractResponse)new ListOffsetResponse(0, partitionData), this.metadata.fetch().leaderFor(this.tp0));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(this.tp0, 0L);
        Map offsetAndTimestampMap = this.fetcher.offsetsForTimes(timestampToSearch, this.time.timer(Long.MAX_VALUE));
        Assert.assertTrue((boolean)offsetAndTimestampMap.containsKey(this.tp0));
        Assert.assertNull(offsetAndTimestampMap.get(this.tp0));
    }

    @Test
    public void testSubscriptionPositionUpdatedWithEpoch() {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L, (long)-1L, (long)-1L, (short)-1, (int)-1, (boolean)false, (int)1);
        builder.appendWithOffset(0L, 0L, "key".getBytes(), "value-1".getBytes());
        builder.appendWithOffset(1L, 0L, "key".getBytes(), "value-2".getBytes());
        builder.appendWithOffset(2L, 0L, "key".getBytes(), "value-3".getBytes());
        MemoryRecords records = builder.build();
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1);
        this.metadata.update(metadataResponse, 0L);
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, records, Errors.NONE, 100L, 0));
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assert.assertEquals((long)this.subscriptions.position((TopicPartition)this.tp0).offset, (long)3L);
        TestUtils.assertOptional(this.subscriptions.position((TopicPartition)this.tp0).offsetEpoch, value -> Assert.assertEquals((long)value.intValue(), (long)1L));
    }

    @Test
    public void testOffsetValidationAwaitsNodeApiVersion() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        this.metadata.update(TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1), 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        Assert.assertFalse((boolean)this.client.isConnected(node.idString()));
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.leaderAndEpoch((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(20L, Optional.of(1), leaderAndEpoch));
        Assert.assertFalse((boolean)this.client.isConnected(node.idString()));
        Assert.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        this.fetcher.validateOffsetsIfNeeded();
        Assert.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        Assert.assertTrue((boolean)this.client.isConnected(node.idString()));
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        HashMap<TopicPartition, EpochEndOffset> endOffsetMap = new HashMap<TopicPartition, EpochEndOffset>();
        endOffsetMap.put(this.tp0, new EpochEndOffset(Errors.NONE, 1, 30L));
        OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(endOffsetMap);
        this.client.prepareResponseFrom((AbstractResponse)resp, node);
        this.fetcher.validateOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0));
        Assert.assertEquals((long)20L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testOffsetValidationSkippedForOldBroker() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        int epochTwo = 2;
        this.metadata.update(TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1), 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.apiVersions.update(node.idString(), NodeApiVersions.create(Collections.singleton(new ApiVersionsResponse.ApiVersion(ApiKeys.OFFSET_FOR_LEADER_EPOCH, 0, 2))));
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.leaderAndEpoch((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.metadata.update(TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 2), 0L);
        this.fetcher.validateOffsetsIfNeeded();
        Assert.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationHandlesSeekWithInflightOffsetForLeaderRequest() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        this.metadata.update(TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1), 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.leaderAndEpoch((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.fetcher.validateOffsetsIfNeeded();
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        Assert.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        Assert.assertTrue((boolean)this.client.hasInFlightRequests());
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(5L, Optional.of(1), leaderAndEpoch));
        Assert.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        this.client.respond(request -> {
            OffsetsForLeaderEpochRequest epochRequest = (OffsetsForLeaderEpochRequest)request;
            OffsetsForLeaderEpochRequest.PartitionData partitionData = (OffsetsForLeaderEpochRequest.PartitionData)epochRequest.epochsByTopicPartition().get(this.tp0);
            return partitionData.currentLeaderEpoch.equals(Optional.of(1)) && partitionData.leaderEpoch == 1;
        }, (AbstractResponse)new OffsetsForLeaderEpochResponse(Collections.singletonMap(this.tp0, new EpochEndOffset(0, 0L))));
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        Assert.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationFencing() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        int epochTwo = 2;
        int epochThree = 3;
        this.metadata.update(TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1), 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.leaderAndEpoch((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekValidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.metadata.update(TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 2), 0L);
        this.fetcher.validateOffsetsIfNeeded();
        Assert.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        this.subscriptions.completeValidation(this.tp0);
        SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(10L, Optional.of(2), new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(2)));
        this.subscriptions.position(this.tp0, nextPosition);
        this.subscriptions.maybeValidatePositionForCurrentLeader(this.tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(3)));
        HashMap<TopicPartition, EpochEndOffset> endOffsetMap = new HashMap<TopicPartition, EpochEndOffset>();
        endOffsetMap.put(this.tp0, new EpochEndOffset(Errors.NONE, 2, 10L));
        OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(endOffsetMap);
        this.client.prepareResponse((AbstractResponse)resp);
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((String)"Expected validation to fail since leader epoch changed", (boolean)this.subscriptions.awaitingValidation(this.tp0));
        this.fetcher.validateOffsetsIfNeeded();
        endOffsetMap.clear();
        endOffsetMap.put(this.tp0, new EpochEndOffset(Errors.NONE, 3, 10L));
        resp = new OffsetsForLeaderEpochResponse(endOffsetMap);
        this.client.prepareResponse((AbstractResponse)resp);
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((String)"Expected validation to succeed with latest epoch", (boolean)this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testTruncationDetected() {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L, (long)-1L, (long)-1L, (short)-1, (int)-1, (boolean)false, (int)1);
        builder.appendWithOffset(0L, 0L, "key".getBytes(), "value-1".getBytes());
        builder.appendWithOffset(1L, 0L, "key".getBytes(), "value-2".getBytes());
        builder.appendWithOffset(2L, 0L, "key".getBytes(), "value-3".getBytes());
        MemoryRecords records = builder.build();
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 2);
        this.metadata.update(metadataResponse, 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.leaderAndEpoch((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekValidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.fetcher.validateOffsetsIfNeeded();
        Assert.assertEquals((long)0L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        Assert.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        HashMap<TopicPartition, EpochEndOffset> endOffsetMap = new HashMap<TopicPartition, EpochEndOffset>();
        endOffsetMap.put(this.tp0, new EpochEndOffset(Errors.NONE, 1, 10L));
        OffsetsForLeaderEpochResponse resp = new OffsetsForLeaderEpochResponse(endOffsetMap);
        this.client.prepareResponse((AbstractResponse)resp);
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0));
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, records, Errors.NONE, 100L, 0));
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assert.assertEquals((long)this.subscriptions.position((TopicPartition)this.tp0).offset, (long)3L);
        TestUtils.assertOptional(this.subscriptions.position((TopicPartition)this.tp0).offsetEpoch, value -> Assert.assertEquals((long)value.intValue(), (long)1L));
    }

    @Test
    public void testPreferredReadReplica() {
        this.buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, (Deserializer)new BytesDeserializer(), (Deserializer)new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5L).toMillis());
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(TestUtils.metadataUpdateWith(2, Collections.singletonMap(this.topicName, 4)));
        this.subscriptions.seek(this.tp0, 0L);
        Node selected = this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds());
        Assert.assertEquals((long)selected.id(), (long)-1L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, -1L, 0, Optional.of(1)));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assert.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        selected = this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds());
        Assert.assertEquals((long)selected.id(), (long)1L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, -1L, 0, Optional.of(2)));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
        selected = this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds());
        Assert.assertEquals((long)selected.id(), (long)-1L);
    }

    @Test
    public void testPreferredReadReplicaOffsetError() {
        this.buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, (Deserializer)new BytesDeserializer(), (Deserializer)new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5L).toMillis());
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(TestUtils.metadataUpdateWith(2, Collections.singletonMap(this.topicName, 4)));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, -1L, 0, Optional.of(1)));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
        Node selected = this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds());
        Assert.assertEquals((long)selected.id(), (long)1L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, 0, Optional.empty()));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
        selected = this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds());
        Assert.assertEquals((long)selected.id(), (long)-1L);
    }

    @Test
    public void testFetchCompletedBeforeHandlerAdded() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.fetcher.sendFetches();
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tp0, this.buildRecords(1L, 1, 1L), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.fetchedRecords();
        Node node = this.fetcher.selectReadReplica(this.tp0, this.subscriptions.position((TopicPartition)this.tp0).currentLeader.leader, this.time.milliseconds());
        AtomicBoolean wokenUp = new AtomicBoolean(false);
        this.client.setWakeupHook(() -> {
            if (!wokenUp.getAndSet(true)) {
                this.consumerClient.disconnectAsync(node);
                this.consumerClient.poll(this.time.timer(0L));
            }
        });
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.consumerClient.disconnectAsync(node);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
        return new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                ListOffsetRequest req = (ListOffsetRequest)body;
                return timestamp == ((ListOffsetRequest.PartitionData)req.partitionTimestamps().get((Object)((FetcherTest)FetcherTest.this).tp0)).timestamp;
            }
        };
    }

    private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, long offset) {
        return this.listOffsetResponse(this.tp0, error, timestamp, offset);
    }

    private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
        ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error, timestamp, offset, Optional.empty());
        HashMap<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
        allPartitionData.put(tp, partitionData);
        return new ListOffsetResponse(allPartitionData);
    }

    private FetchResponse<MemoryRecords> fullFetchResponseWithAbortedTransactions(MemoryRecords records, List<FetchResponse.AbortedTransaction> abortedTransactions, Errors error, long lastStableOffset, long hw, int throttleTime) {
        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(this.tp0, new FetchResponse.PartitionData(error, hw, lastStableOffset, 0L, abortedTransactions, (BaseRecords)records));
        return new FetchResponse(Errors.NONE, new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>(partitions), throttleTime, 0);
    }

    private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
        return this.fullFetchResponse(tp, records, error, hw, -1L, throttleTime);
    }

    private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, int throttleTime) {
        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, lastStableOffset, 0L, null, (BaseRecords)records));
        return new FetchResponse(Errors.NONE, new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>(partitions), throttleTime, 0);
    }

    private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, int throttleTime, Optional<Integer> preferredReplicaId) {
        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, lastStableOffset, 0L, preferredReplicaId, null, (BaseRecords)records));
        return new FetchResponse(Errors.NONE, new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>(partitions), throttleTime, 0);
    }

    private FetchResponse<MemoryRecords> fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, long logStartOffset, int throttleTime) {
        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, lastStableOffset, logStartOffset, null, (BaseRecords)records));
        return new FetchResponse(Errors.NONE, new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>(partitions), throttleTime, 0);
    }

    private MetadataResponse newMetadataResponse(String topic, Errors error) {
        ArrayList partitionsMetadata = new ArrayList();
        if (error == Errors.NONE) {
            Optional<MetadataResponse.TopicMetadata> foundMetadata = this.initialUpdateResponse.topicMetadata().stream().filter(topicMetadata -> topicMetadata.topic().equals(topic)).findFirst();
            foundMetadata.ifPresent(topicMetadata -> partitionsMetadata.addAll(topicMetadata.partitionMetadata()));
        }
        MetadataResponse.TopicMetadata topicMetadata2 = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
        ArrayList brokers = new ArrayList(this.initialUpdateResponse.brokers());
        return MetadataResponse.prepareResponse(brokers, (String)this.initialUpdateResponse.clusterId(), (int)this.initialUpdateResponse.controller().id(), Collections.singletonList(topicMetadata2));
    }

    private <K, V> Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        return this.fetcher.fetchedRecords();
    }

    private void buildFetcher(int maxPollRecords) {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), maxPollRecords, IsolationLevel.READ_UNCOMMITTED);
    }

    private void buildFetcher() {
        this.buildFetcher(Integer.MAX_VALUE);
    }

    private void buildFetcher(Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, keyDeserializer, valueDeserializer, Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
    }

    private <K, V> void buildFetcher(OffsetResetStrategy offsetResetStrategy, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int maxPollRecords, IsolationLevel isolationLevel) {
        this.buildFetcher(new MetricConfig(), offsetResetStrategy, keyDeserializer, valueDeserializer, maxPollRecords, isolationLevel);
    }

    private <K, V> void buildFetcher(MetricConfig metricConfig, OffsetResetStrategy offsetResetStrategy, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int maxPollRecords, IsolationLevel isolationLevel) {
        this.buildFetcher(metricConfig, offsetResetStrategy, keyDeserializer, valueDeserializer, maxPollRecords, isolationLevel, Long.MAX_VALUE);
    }

    private <K, V> void buildFetcher(MetricConfig metricConfig, OffsetResetStrategy offsetResetStrategy, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int maxPollRecords, IsolationLevel isolationLevel, long metadataExpireMs) {
        LogContext logContext = new LogContext();
        SubscriptionState subscriptionState = new SubscriptionState(logContext, offsetResetStrategy);
        this.buildFetcher(metricConfig, keyDeserializer, valueDeserializer, maxPollRecords, isolationLevel, metadataExpireMs, subscriptionState, logContext);
    }

    private void buildFetcher(SubscriptionState subscriptionState, LogContext logContext) {
        this.buildFetcher(new MetricConfig(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED, Long.MAX_VALUE, subscriptionState, logContext);
    }

    private <K, V> void buildFetcher(MetricConfig metricConfig, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int maxPollRecords, IsolationLevel isolationLevel, long metadataExpireMs, SubscriptionState subscriptionState, LogContext logContext) {
        this.buildDependencies(metricConfig, metadataExpireMs, subscriptionState, logContext);
        this.fetcher = new Fetcher(new LogContext(), this.consumerClient, this.minBytes, this.maxBytes, this.maxWaitMs, this.fetchSize, maxPollRecords, true, "", keyDeserializer, valueDeserializer, this.metadata, this.subscriptions, this.metrics, this.metricsRegistry, (Time)this.time, this.retryBackoffMs, this.requestTimeoutMs, isolationLevel, this.apiVersions);
    }

    private void buildDependencies(MetricConfig metricConfig, long metadataExpireMs, SubscriptionState subscriptionState, LogContext logContext) {
        this.time = new MockTime(1L);
        this.subscriptions = subscriptionState;
        this.metadata = new ConsumerMetadata(0L, metadataExpireMs, false, false, this.subscriptions, logContext, new ClusterResourceListeners());
        this.client = new MockClient((Time)this.time, (Metadata)this.metadata);
        this.metrics = new Metrics(metricConfig, (Time)this.time);
        this.consumerClient = new ConsumerNetworkClient(logContext, (KafkaClient)this.client, (Metadata)this.metadata, (Time)this.time, 100L, 1000, Integer.MAX_VALUE);
        this.metricsRegistry = new FetcherMetricsRegistry(metricConfig.tags().keySet(), "consumer" + this.groupId);
    }

    private <T> List<Long> collectRecordOffsets(List<ConsumerRecord<T, T>> records) {
        return records.stream().map(ConsumerRecord::offset).collect(Collectors.toList());
    }
}

