/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import com.yammer.metrics.core.Meter;
import java.io.File;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import kafka.api.ApiVersion;
import kafka.api.ApiVersion$;
import kafka.api.KAFKA_2_6_IV0$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.Log;
import kafka.log.LogAppendInfo;
import kafka.log.LogManager;
import kafka.server.AbstractFetcherThread;
import kafka.server.BlockingSend;
import kafka.server.BrokerTopicStats;
import kafka.server.FailedPartitions;
import kafka.server.Fetching$;
import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.OffsetAndEpoch;
import kafka.server.PartitionFetchState;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaAlterLogDirsManager;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.ReplicationQuotaManager;
import kafka.server.Truncating$;
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Set;
import scala.collection.immutable.List;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0005\t]a\u0001\u0002\u0015*\u00019BQ!\u000e\u0001\u0005\u0002YBq!\u000f\u0001C\u0002\u0013%!\b\u0003\u0004G\u0001\u0001\u0006Ia\u000f\u0005\b\u000f\u0002\u0011\r\u0011\"\u0003;\u0011\u0019A\u0005\u0001)A\u0005w!9\u0011\n\u0001b\u0001\n\u0013Q\u0004B\u0002&\u0001A\u0003%1\bC\u0004L\u0001\t\u0007I\u0011\u0002'\t\rM\u0003\u0001\u0015!\u0003N\u0011\u001d!\u0006A1A\u0005\nUCa!\u0017\u0001!\u0002\u00131\u0006\"\u0002.\u0001\t\u0013Y\u0006bB5\u0001#\u0003%IA\u001b\u0005\u0006k\u0002!\tA\u001e\u0005\u0007\u0003\u0017\u0001A\u0011\u0001<\t\r\u0005U\u0001\u0001\"\u0001w\u0011\u001d\tI\u0002\u0001C\u0001\u00037Aa!!\u000f\u0001\t\u00031\bBBA\u001f\u0001\u0011\u0005a\u000f\u0003\u0004\u0002B\u0001!\tA\u001e\u0005\b\u0003\u000b\u0002A\u0011BA$\u0011!\tY\u0006AI\u0001\n\u0013Q\u0007BBA/\u0001\u0011\u0005a\u000f\u0003\u0004\u0002b\u0001!\tA\u001e\u0005\u0007\u0003K\u0002A\u0011\u0001<\t\r\u0005%\u0004\u0001\"\u0001w\u0011\u0019\ti\u0007\u0001C\u0001m\"1\u0011\u0011\u000f\u0001\u0005\u0002YDa!!\u001e\u0001\t\u00031\bBBA=\u0001\u0011\u0005a\u000f\u0003\u0004\u0002~\u0001!\tA\u001e\u0005\u0007\u0003\u0003\u0003A\u0011\u0001<\t\r\u0005\u0015\u0005\u0001\"\u0001w\u0011\u0019\tI\t\u0001C\u0001m\"9\u0011Q\u0012\u0001\u0005\n\u0005=\u0005bBAG\u0001\u0011%\u0011q\u0019\u0005\b\u0003?\u0004A\u0011BAq\u0011\u001d\t9\u000f\u0001C\u0001\u0003SDqA!\u0004\u0001\t\u0013\u0011yA\u0001\rSKBd\u0017nY1GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a+fgRT!AK\u0016\u0002\rM,'O^3s\u0015\u0005a\u0013!B6bM.\f7\u0001A\n\u0003\u0001=\u0002\"\u0001M\u001a\u000e\u0003ER\u0011AM\u0001\u0006g\u000e\fG.Y\u0005\u0003iE\u0012a!\u00118z%\u00164\u0017A\u0002\u001fj]&$h\bF\u00018!\tA\u0004!D\u0001*\u0003\u0011!\u0018\u0007\u001d\u0019\u0016\u0003m\u0002\"\u0001\u0010#\u000e\u0003uR!AP \u0002\r\r|W.\\8o\u0015\ta\u0003I\u0003\u0002B\u0005\u00061\u0011\r]1dQ\u0016T\u0011aQ\u0001\u0004_J<\u0017BA#>\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:\fQ\u0001^\u0019qa\u0001\nA\u0001^\u0019qc\u0005)A/\r92A\u0005!AO\r92\u0003\u0015!(\u0007]\u0019!\u00039\u0011'o\\6fe\u0016sG\rU8j]R,\u0012!\u0014\t\u0003\u001dFk\u0011a\u0014\u0006\u0003!.\nqa\u00197vgR,'/\u0003\u0002S\u001f\nq!I]8lKJ,e\u000e\u001a)pS:$\u0018a\u00042s_.,'/\u00128e!>Lg\u000e\u001e\u0011\u0002!\u0019\f\u0017\u000e\\3e!\u0006\u0014H/\u001b;j_:\u001cX#\u0001,\u0011\u0005a:\u0016B\u0001-*\u0005A1\u0015-\u001b7fIB\u000b'\u000f^5uS>t7/A\tgC&dW\r\u001a)beRLG/[8og\u0002\n\u0011#\u001b8ji&\fGNR3uG\"\u001cF/\u0019;f)\rav\f\u001a\t\u0003quK!AX\u0015\u0003#%s\u0017\u000e^5bY\u001a+Go\u00195Ti\u0006$X\rC\u0003a\u0019\u0001\u0007\u0011-A\u0006gKR\u001c\u0007n\u00144gg\u0016$\bC\u0001\u0019c\u0013\t\u0019\u0017G\u0001\u0003M_:<\u0007bB3\r!\u0003\u0005\rAZ\u0001\fY\u0016\fG-\u001a:Fa>\u001c\u0007\u000e\u0005\u00021O&\u0011\u0001.\r\u0002\u0004\u0013:$\u0018aG5oSRL\u0017\r\u001c$fi\u000eD7\u000b^1uK\u0012\"WMZ1vYR$#'F\u0001lU\t1GnK\u0001n!\tq7/D\u0001p\u0015\t\u0001\u0018/A\u0005v]\u000eDWmY6fI*\u0011!/M\u0001\u000bC:tw\u000e^1uS>t\u0017B\u0001;p\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001\bG2,\u0017M\\;q)\u00059\bC\u0001\u0019y\u0013\tI\u0018G\u0001\u0003V]&$\bF\u0001\b|!\ra\u0018qA\u0007\u0002{*\u0011ap`\u0001\u0004CBL'\u0002BA\u0001\u0003\u0007\tqA[;qSR,'OC\u0002\u0002\u0006\t\u000bQA[;oSRL1!!\u0003~\u0005%\te\r^3s\u000b\u0006\u001c\u0007.\u0001\u0015tQ>,H\u000eZ*f]\u0012d\u0015\r^3tiJ+\u0017/^3tiZ+'o]5p]N\u0014\u0015\u0010R3gCVdG\u000fK\u0002\u0010\u0003\u001f\u00012\u0001`A\t\u0013\r\t\u0019\" \u0002\u0005)\u0016\u001cH/\u0001 uKN$h)\u001a;dQ2+\u0017\rZ3s\u000bB|7\r\u001b*fcV,7\u000f^%g\u0019\u0006\u001cH/\u00129pG\"$UMZ5oK\u00124uN]*p[\u0016\u0004\u0016M\u001d;ji&|gn\u001d\u0015\u0004!\u0005=\u0011!F1tg\u0016\u0014H\u000fU1si&$\u0018n\u001c8Ti\u0006$Xm\u001d\u000b\no\u0006u\u0011qEA\u0019\u0003kAq!a\b\u0012\u0001\u0004\t\t#A\u0004gKR\u001c\u0007.\u001a:\u0011\u0007a\n\u0019#C\u0002\u0002&%\u0012Q#\u00112tiJ\f7\r\u001e$fi\u000eDWM\u001d+ie\u0016\fG\rC\u0004\u0002*E\u0001\r!a\u000b\u0002+MDw.\u001e7e\u0005\u0016\u0014V-\u00193z\r>\u0014h)\u001a;dQB\u0019\u0001'!\f\n\u0007\u0005=\u0012GA\u0004C_>dW-\u00198\t\u000f\u0005M\u0012\u00031\u0001\u0002,\u0005)2\u000f[8vY\u0012\u0014U\r\u0016:v]\u000e\fG/\u001b8h\u0019><\u0007bBA\u001c#\u0001\u0007\u00111F\u0001\u0010g\"|W\u000f\u001c3CK\u0012+G.Y=fI\u0006)3\u000f[8vY\u0012D\u0015M\u001c3mK\u0016C8-\u001a9uS>tgI]8n\u00052|7m[5oON+g\u000e\u001a\u0015\u0004%\u0005=\u0011aQ:i_VdGMR3uG\"dU-\u00193fe\u0016\u0003xn\u00195P]\u001aK'o\u001d;GKR\u001c\u0007n\u00148ms&3G*Z1eKJ,\u0005o\\2i\u0017:|wO\u001c+p\u0005>$\b.\u00132qeYB3aEA\b\u0003a\u001a\bn\\;mI:{GOR3uG\"dU-\u00193fe\u0016\u0003xn\u00195P]\u001aK'o\u001d;GKR\u001c\u0007nV5uQR\u0013XO\\2bi\u0016|eNR3uG\"D3\u0001FA\b\u0003\t2XM]5gs\u001a+Go\u00195MK\u0006$WM]#q_\u000eDwJ\u001c$jeN$h)\u001a;dQR)q/!\u0013\u0002X!9\u00111J\u000bA\u0002\u00055\u0013aA5caB!\u0011qJA*\u001b\t\t\tF\u0003\u0002\u007fW%!\u0011QKA)\u0005)\t\u0005/\u001b,feNLwN\u001c\u0005\t\u00033*\u0002\u0013!a\u0001M\u0006yQ\r]8dQ\u001a+Go\u00195D_VtG/\u0001\u0017wKJLg-\u001f$fi\u000eDG*Z1eKJ,\u0005o\\2i\u001f:4\u0015N]:u\r\u0016$8\r\u001b\u0013eK\u001a\fW\u000f\u001c;%e\u0005!4\u000f[8vY\u0012$&/\u001e8dCR,Gk\\(gMN,Go\u00159fG&4\u0017.\u001a3J]\u0016\u0003xn\u00195PM\u001a\u001cX\r\u001e*fgB|gn]3)\u0007]\ty!A'tQ>,H\u000e\u001a+sk:\u001c\u0017\r^3U_>3gm]3u'B,7-\u001b4jK\u0012Le.\u00129pG\"|eMZ:fiJ+7\u000f]8og\u0016LeMR8mY><XM\u001d%bg:{Wj\u001c:f\u000bB|7\r[:)\u0007a\ty!\u0001&tQ>,H\u000e\u001a$fi\u000eDG*Z1eKJ,\u0005o\\2i'\u0016\u001cwN\u001c3US6,\u0017J\u001a'fC\u0012,'OU3qY&,7oV5uQ\u0016\u0003xn\u00195O_R\\en\\<o)>4u\u000e\u001c7po\u0016\u0014\bfA\r\u0002\u0010\u0005\t5\u000f[8vY\u0012$&/\u001e8dCR,\u0017J\u001a'fC\u0012,'OU3qY&,7oV5uQ\u0012Kg/\u001a:hS:<W\t]8dQ:{Go\u00138po:$vNR8mY><XM\u001d\u0015\u00045\u0005=\u0011aM:i_VdG-V:f\u0019\u0016\fG-\u001a:F]\u0012|eMZ:fi&3\u0017J\u001c;fe\n\u0013xn[3s-\u0016\u00148/[8o\u0005\u0016dwn\u001e\u001a1Q\rY\u0012qB\u0001Ag\"|W\u000f\u001c3UeVt7-\u0019;f)>Le.\u001b;jC24U\r^2i\u001f\u001a47/\u001a;JM2+\u0017\rZ3s%\u0016$XO\u001d8t+:$WMZ5oK\u0012|eMZ:fi\"\u001aA$a\u0004\u0002cMDw.\u001e7e!>dG.\u00138eK\u001aLg.\u001b;fYfLe\rT3bI\u0016\u0014(+\u001a;ve:\u001c\u0018I\\=Fq\u000e,\u0007\u000f^5p]\"\u001aQ$a\u0004\u0002WMDw.\u001e7e\u001b>4X\rU1si&$\u0018n\u001c8t\u001fV$xJ\u001a+sk:\u001c\u0017\r^5oO2{wm\u0015;bi\u0016D3AHA\b\u0003a\u001a\bn\\;mI\u001aKG\u000e^3s!\u0006\u0014H/\u001b;j_:\u001cX*\u00193f\u0019\u0016\fG-\u001a:EkJLgn\u001a'fC\u0012,'/\u00129pG\"\u0014V-];fgRD3aHA\b\u0003!\u001b\bn\\;mI\u000e\u000bGo\u00195Fq\u000e,\u0007\u000f^5p]\u001a\u0013x.\u001c\"m_\u000e\\\u0017N\\4TK:$w\u000b[3o'\",H\u000f^5oO\u0012{wO\u001c*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\rK\u0002!\u0003\u001f\tae\u001d5pk2$W\u000b\u001d3bi\u0016\u0014V-Y:tS\u001etW.\u001a8u\u0005f$Xm]%o\u001b\u0016$(/[2tQ\r\t\u0013qB\u0001Gg\"|W\u000f\u001c3O_R,\u0006\u000fZ1uKJ+\u0017m]:jO:lWM\u001c;CsR,7/\u00138NKR\u0014\u0018nY:XQ\u0016tgj\u001c*fCN\u001c\u0018n\u001a8nK:$8/\u00138Qe><'/Z:tQ\r\u0011\u0013qB\u0001\"]\u0016<xJ\u001a4tKR4uN\u001d'fC\u0012,'\u000fU1si&$\u0018n\u001c8SKN,H\u000e\u001e\u000b\t\u0003#\u000bi,!1\u0002DB!\u00111SA\\\u001d\u0011\t)*!-\u000f\t\u0005]\u0015Q\u0016\b\u0005\u00033\u000bYK\u0004\u0003\u0002\u001c\u0006%f\u0002BAO\u0003OsA!a(\u0002&6\u0011\u0011\u0011\u0015\u0006\u0004\u0003Gk\u0013A\u0002\u001fs_>$h(C\u0001D\u0013\t\t%)\u0003\u0002-\u0001&\u0011ahP\u0005\u0004\u0003_k\u0014aB7fgN\fw-Z\u0005\u0005\u0003g\u000b),\u0001\u0011PM\u001a\u001cX\r\u001e$pe2+\u0017\rZ3s\u000bB|7\r\u001b*fgB|gn]3ECR\f'bAAX{%!\u0011\u0011XA^\u00059)\u0005o\\2i\u000b:$wJ\u001a4tKRTA!a-\u00026\"1\u0011qX\u0012A\u0002m\n!\u0001\u001e9\t\u000b\u0015\u001c\u0003\u0019\u00014\t\r\u0005\u00157\u00051\u0001b\u0003%)g\u000eZ(gMN,G\u000f\u0006\u0006\u0002\u0012\u0006%\u00171ZAn\u0003;Da!a0%\u0001\u0004Y\u0004bBAgI\u0001\u0007\u0011qZ\u0001\u0006KJ\u0014xN\u001d\t\u0005\u0003#\f9.\u0004\u0002\u0002T*\u0019\u0011Q[\u001f\u0002\u0011A\u0014x\u000e^8d_2LA!!7\u0002T\n1QI\u001d:peNDQ!\u001a\u0013A\u0002\u0019Da!!2%\u0001\u0004\t\u0017AH1tg\u0016\u0014H\u000f\u0015:pG\u0016\u001c8\u000fU1si&$\u0018n\u001c8ECR\fw\u000b[3o)\r9\u00181\u001d\u0005\b\u0003K,\u0003\u0019AA\u0016\u00035I7OU3bgNLwM\\5oO\u0006!1\u000f^;c)\u001d9\u00181^A{\u0003\u007fDq!!<'\u0001\u0004\ty/A\u0005qCJ$\u0018\u000e^5p]B\u0019a*!=\n\u0007\u0005MxJA\u0005QCJ$\u0018\u000e^5p]\"9\u0011q\u001f\u0014A\u0002\u0005e\u0018A\u0004:fa2L7-Y'b]\u0006<WM\u001d\t\u0004q\u0005m\u0018bAA\u007fS\tq!+\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\bb\u0002B\u0001M\u0001\u0007!1A\u0001\u0004Y><\u0007\u0003\u0002B\u0003\u0005\u0013i!Aa\u0002\u000b\u0007\t\u00051&\u0003\u0003\u0003\f\t\u001d!a\u0001'pO\u0006a2.\u00194lC\u000e{gNZ5h\u001d>$&/\u001e8dCR,wJ\u001c$fi\u000eDWC\u0001B\t!\rA$1C\u0005\u0004\u0005+I#aC&bM.\f7i\u001c8gS\u001e\u0004")
public class ReplicaFetcherThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final BrokerEndPoint kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
    private final FailedPartitions kafka$server$ReplicaFetcherThreadTest$$failedPartitions = new FailedPartitions();

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    public BrokerEndPoint kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint() {
        return this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint;
    }

    public FailedPartitions kafka$server$ReplicaFetcherThreadTest$$failedPartitions() {
        return this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions;
    }

    private InitialFetchState initialFetchState(long fetchOffset, int leaderEpoch) {
        BrokerEndPoint x$1 = new BrokerEndPoint(0, "localhost", 9092);
        return new InitialFetchState(x$1, leaderEpoch, fetchOffset);
    }

    private int initialFetchState$default$2() {
        return 1;
    }

    @AfterEach
    public void cleanup() {
        TestUtils$.MODULE$.clearYammerMetrics();
    }

    @Test
    public void shouldSendLatestRequestVersionsByDefault() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.mock(ReplicaManager.class);
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay((Object[])new Object[]{replicaManager});
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), config, this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)QuotaFactory.UnboundedQuota$.MODULE$, (Option)None$.MODULE$);
        Assertions.assertEquals((short)ApiKeys.FETCH.latestVersion(), (short)thread.fetchRequestVersion());
        Assertions.assertEquals((short)ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), (short)thread.offsetForLeaderEpochRequestVersion());
        Assertions.assertEquals((short)ApiKeys.LIST_OFFSETS.latestVersion(), (short)thread.listOffsetRequestVersion());
    }

    @Test
    public void testFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions() {
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).once();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).once();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)None$.MODULE$).once();
        EasyMock.expect((Object)log.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        this.stub(partition, replicaManager, log);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).times(3);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsets = CollectionConverters$.MODULE$.MapHasAsJava((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p0(), leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), config, this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, 1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, 1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.initialFetchState(0L, 1))})));
        this.assertPartitionStates((AbstractFetcherThread)thread, false, true, false);
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)3, (int)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        EasyMock.verify((Object[])new Object[]{logManager});
    }

    public void assertPartitionStates(AbstractFetcherThread fetcher, boolean shouldBeReadyForFetch, boolean shouldBeTruncatingLog, boolean shouldBeDelayed) {
        ((List)package$.MODULE$.List().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{this.t1p0(), this.t1p1(), this.t2p1()}))).foreach((Function1 & Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$assertPartitionStates$1(fetcher, shouldBeReadyForFetch, shouldBeTruncatingLog, shouldBeDelayed, tp);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend mockBlockingSend = (BlockingSend)EasyMock.createMock(BlockingSend.class);
        EasyMock.expect((Object)mockBlockingSend.sendRequest((AbstractRequest.Builder)EasyMock.anyObject())).andThrow((Throwable)new NullPointerException()).once();
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.mock(ReplicaManager.class);
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay((Object[])new Object[]{mockBlockingSend, replicaManager});
        Map result = new ReplicaFetcherThread("bob", 0, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), config, this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), null, (Option)new Some((Object)mockBlockingSend)).fetchEpochEndOffsets((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(this.t1p0().partition()).setLeaderEpoch(0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(this.t1p1().partition()).setLeaderEpoch(0))})));
        Assertions.assertEquals((Object)((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p0(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))), (Object)result, (String)"results from leader epoch request should have undefined offset");
        EasyMock.verify((Object[])new Object[]{mockBlockingSend});
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBothIbp26() {
        this.verifyFetchLeaderEpochOnFirstFetch((ApiVersion)KAFKA_2_6_IV0$.MODULE$, 1);
    }

    @Test
    public void shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch() {
        this.verifyFetchLeaderEpochOnFirstFetch(ApiVersion$.MODULE$.latestVersion(), 0);
    }

    private void verifyFetchLeaderEpochOnFirstFetch(ApiVersion ibp, int epochFetchCount) {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1);
        props.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), ibp.version());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        this.stub(partition, replicaManager, log);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).times(2);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, partition, log});
        java.util.Map offsets = CollectionConverters$.MODULE$.MapHasAsJava((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p0(), leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), config, this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)QuotaFactory.UnboundedQuota$.MODULE$, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, 1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, 1))})));
        thread.doWork();
        Assertions.assertEquals((int)epochFetchCount, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        thread.doWork();
        Assertions.assertEquals((int)epochFetchCount, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        thread.doWork();
        Assertions.assertEquals((int)epochFetchCount, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)3, (int)mockNetwork.fetchCount());
        EasyMock.verify((Object[])new Object[]{logManager});
    }

    private int verifyFetchLeaderEpochOnFirstFetch$default$2() {
        return 1;
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponse() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 1))).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLEO, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsetsReply = CollectionConverters$.MODULE$.MapHasAsJava((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p0(), leaderEpoch, 156L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t2p1(), leaderEpoch, 172L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), config, this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, 1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.initialFetchState(0L, 1))})));
        thread.doWork();
        Assertions.assertTrue((boolean)CollectionConverters$.MODULE$.ListHasAsScala(truncateToCapture.getValues()).asScala().contains((Object)BoxesRunTime.boxToInteger((int)156)), (String)new StringBuilder(58).append("Expected ").append(this.t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        Assertions.assertTrue((boolean)CollectionConverters$.MODULE$.ListHasAsScala(truncateToCapture.getValues()).asScala().contains((Object)BoxesRunTime.boxToInteger((int)172)), (String)new StringBuilder(58).append("Expected ").append(this.t2p1()).append(" to truncate to offset 172 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpochAtFollower = 5;
        int leaderEpochAtLeader = 4;
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 3))).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpochAtFollower))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(leaderEpochAtLeader)).andReturn((Object)None$.MODULE$).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsetsReply = CollectionConverters$.MODULE$.MapHasAsJava((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p0(), leaderEpochAtLeader, 156L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t2p1(), leaderEpochAtLeader, 202L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), config, this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, 1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.initialFetchState(0L, 1))})));
        thread.doWork();
        Assertions.assertTrue((boolean)CollectionConverters$.MODULE$.ListHasAsScala(truncateToCapture.getValues()).asScala().contains((Object)BoxesRunTime.boxToInteger((int)156)), (String)new StringBuilder(58).append("Expected ").append(this.t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        Assertions.assertTrue((boolean)CollectionConverters$.MODULE$.ListHasAsScala(truncateToCapture.getValues()).asScala().contains((Object)BoxesRunTime.boxToInteger((int)initialLEO)), (String)new StringBuilder(55).append("Expected ").append(this.t2p1()).append(" to truncate to offset ").append(initialLEO).append(" (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
    }

    @Test
    public void shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 2))).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(4)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(3)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsets = CollectionConverters$.MODULE$.MapHasAsJava((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p0(), 4, 155L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), 4, 143L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), config, this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, 1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, 1))})));
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)0, (int)mockNetwork.fetchCount());
        java.util.Map nextOffsets = CollectionConverters$.MODULE$.MapHasAsJava((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p0(), 3, 101L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), 3, 102L))}))).asJava();
        mockNetwork.setOffsetsForNextResponse(nextOffsets);
        thread.doWork();
        Assertions.assertEquals((int)2, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        Assertions.assertTrue((mockNetwork.lastUsedOffsetForLeaderEpochVersion() >= 3 ? 1 : 0) != 0, (String)"OffsetsForLeaderEpochRequest version.");
        thread.doWork();
        Assertions.assertEquals((int)2, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        Assertions.assertTrue((boolean)CollectionConverters$.MODULE$.ListHasAsScala(truncateToCapture.getValues()).asScala().contains((Object)BoxesRunTime.boxToInteger((int)102)), (String)new StringBuilder(58).append("Expected ").append(this.t1p1()).append(" to truncate to offset 102 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        Assertions.assertTrue((boolean)CollectionConverters$.MODULE$.ListHasAsScala(truncateToCapture.getValues()).asScala().contains((Object)BoxesRunTime.boxToInteger((int)101)), (String)new StringBuilder(58).append("Expected ").append(this.t1p0()).append(" to truncate to offset 101 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1));
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createNiceMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialLEO = 200;
        ObjectRef latestLogEpoch = ObjectRef.create((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5)));
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)115L)).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andAnswer(() -> (Option)latestLogEpoch$1.elem).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(4)).andReturn((Object)new Some((Object)new OffsetAndEpoch(149L, 4))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(3)).andReturn((Object)new Some((Object)new OffsetAndEpoch(129L, 2))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(2)).andReturn((Object)new Some((Object)new OffsetAndEpoch(119L, 1))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(Collections.emptyMap(), this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread(this, config, replicaManager, quota, mockNetwork){

            public Option<LogAppendInfo> processPartitionData(TopicPartition topicPartition, long fetchOffset, FetchResponse.PartitionData<Records> partitionData) {
                return None$.MODULE$;
            }
        };
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(initialLEO, 1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(initialLEO, 1))})));
        scala.collection.immutable.Set partitions = (scala.collection.immutable.Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{this.t1p0(), this.t1p1()}));
        thread.doWork();
        Assertions.assertEquals((int)0, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        partitions.foreach((Function1 & Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$2(thread, tp);
            return BoxedUnit.UNIT;
        });
        mockNetwork.setFetchPartitionDataForNextResponse((Map<TopicPartition, FetchResponse.PartitionData<Records>>)((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)ReplicaFetcherThreadTest.partitionData$1(new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)ReplicaFetcherThreadTest.partitionData$1(new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(141L)))}))));
        latestLogEpoch.elem = new Some((Object)BoxesRunTime.boxToInteger((int)4));
        thread.doWork();
        Assertions.assertEquals((int)0, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        Assertions.assertTrue((boolean)CollectionConverters$.MODULE$.ListHasAsScala(truncateToCapture.getValues()).asScala().contains((Object)BoxesRunTime.boxToInteger((int)140)), (String)new StringBuilder(58).append("Expected ").append(this.t1p0()).append(" to truncate to offset 140 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        Assertions.assertTrue((boolean)CollectionConverters$.MODULE$.ListHasAsScala(truncateToCapture.getValues()).asScala().contains((Object)BoxesRunTime.boxToInteger((int)141)), (String)new StringBuilder(58).append("Expected ").append(this.t1p1()).append(" to truncate to offset 141 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        partitions.foreach((Function1 & Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$3(thread, tp);
            return BoxedUnit.UNIT;
        });
        mockNetwork.setFetchPartitionDataForNextResponse((Map<TopicPartition, FetchResponse.PartitionData<Records>>)((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)ReplicaFetcherThreadTest.partitionData$1(new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(130L))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)ReplicaFetcherThreadTest.partitionData$1(new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(131L)))}))));
        thread.doWork();
        Assertions.assertEquals((int)0, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)3, (int)mockNetwork.fetchCount());
        Assertions.assertTrue((boolean)CollectionConverters$.MODULE$.ListHasAsScala(truncateToCapture.getValues()).asScala().contains((Object)BoxesRunTime.boxToInteger((int)129)), (String)new StringBuilder(57).append("Expected to truncate to offset 129 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        partitions.foreach((Function1 & Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(thread, tp);
            return BoxedUnit.UNIT;
        });
        mockNetwork.setFetchPartitionDataForNextResponse((Map<TopicPartition, FetchResponse.PartitionData<Records>>)((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)ReplicaFetcherThreadTest.partitionData$1(new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(120L))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)ReplicaFetcherThreadTest.partitionData$1(new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(121L)))}))));
        latestLogEpoch.elem = None$.MODULE$;
        thread.doWork();
        Assertions.assertEquals((int)0, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)4, (int)mockNetwork.fetchCount());
        Assertions.assertTrue((boolean)CollectionConverters$.MODULE$.ListHasAsScala(truncateToCapture.getValues()).asScala().contains((Object)BoxesRunTime.boxToInteger((int)119)), (String)new StringBuilder(57).append("Expected to truncate to offset 119 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        partitions.foreach((Function1 & Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(thread, tp);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1);
        props.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 2))).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(4)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(3)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsets = CollectionConverters$.MODULE$.MapHasAsJava((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p0(), -1, 155L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), -1, 143L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), config, this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, 1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, 1))})));
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        Assertions.assertEquals((int)0, (int)mockNetwork.lastUsedOffsetForLeaderEpochVersion(), (String)"OffsetsForLeaderEpochRequest version.");
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        Assertions.assertTrue((boolean)CollectionConverters$.MODULE$.ListHasAsScala(truncateToCapture.getValues()).asScala().contains((Object)BoxesRunTime.boxToInteger((int)155)), (String)new StringBuilder(58).append("Expected ").append(this.t1p0()).append(" to truncate to offset 155 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
        Assertions.assertTrue((boolean)CollectionConverters$.MODULE$.ListHasAsScala(truncateToCapture.getValues()).asScala().contains((Object)BoxesRunTime.boxToInteger((int)143)), (String)new StringBuilder(58).append("Expected ").append(this.t1p1()).append(" to truncate to offset 143 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfLeaderReturnsUndefinedOffset() {
        Capture truncated = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialFetchOffset = 100;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncated)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)initialFetchOffset)).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5))).times(2);
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsetsReply = CollectionConverters$.MODULE$.MapHasAsJava((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p0(), -1, -1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), config, this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(initialFetchOffset, 1))})));
        thread.doWork();
        Assertions.assertEquals((long)initialFetchOffset, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldPollIndefinitelyIfLeaderReturnsAnyException() {
        Capture truncated = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        int highWaterMark = 100;
        int initialLeo = 300;
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)highWaterMark)).anyTimes();
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncated)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLeo, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLeo)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsetsReply = CollectionConverters$.MODULE$.MutableMapHasAsJava((scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p0(), Errors.NOT_LEADER_OR_FOLLOWER, -1, -1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), config, this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, 1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, 1))})));
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)x$1 -> thread.doWork());
        Assertions.assertEquals((int)0, (int)truncated.getValues().size());
        offsetsReply.put(this.t1p0(), this.newOffsetForLeaderPartitionResult(this.t1p0(), leaderEpoch, 156L));
        thread.doWork();
        Assertions.assertEquals((long)156L, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldMovePartitionsOutOfTruncatingLogState() {
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createNiceMock(ReplicaManager.class);
        int leaderEpoch = 4;
        partition.truncateTo(0L, false);
        EasyMock.expect((Object)BoxedUnit.UNIT).times(2);
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsetsReply = CollectionConverters$.MODULE$.MapHasAsJava((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p0(), leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), config, this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, 1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, 1))})));
        Assertions.assertEquals((Object)Option$.MODULE$.apply((Object)Truncating$.MODULE$), (Object)thread.fetchState(this.t1p0()).map((Function1 & Serializable)x$2 -> x$2.state()));
        Assertions.assertEquals((Object)Option$.MODULE$.apply((Object)Truncating$.MODULE$), (Object)thread.fetchState(this.t1p1()).map((Function1 & Serializable)x$3 -> x$3.state()));
        thread.doWork();
        Assertions.assertEquals((Object)Option$.MODULE$.apply((Object)Fetching$.MODULE$), (Object)thread.fetchState(this.t1p0()).map((Function1 & Serializable)x$4 -> x$4.state()));
        Assertions.assertEquals((Object)Option$.MODULE$.apply((Object)Fetching$.MODULE$), (Object)thread.fetchState(this.t1p1()).map((Function1 & Serializable)x$5 -> x$5.state()));
    }

    @Test
    public void shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest() {
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        int initialLEO = 100;
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Log log = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createNiceMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).andReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 2))).anyTimes();
        EasyMock.expect((Object)log.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5))).anyTimes();
        EasyMock.expect((Object)log.endOffsetForEpoch(5)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLEO, 5))).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException((TopicPartition)EasyMock.anyObject(TopicPartition.class))).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(partition, replicaManager, log);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, partition, log});
        java.util.Map offsetsReply = CollectionConverters$.MODULE$.MapHasAsJava((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p0(), 5, 52L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), 5, 49L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), config, this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, 1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, 1))})));
        TopicPartition partitionThatBecameLeader = this.t1p0();
        mockNetwork.setEpochRequestCallback((Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> thread.removePartitions((Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{partitionThatBecameLeader}))));
        thread.doWork();
        Assertions.assertEquals((long)49L, (long)BoxesRunTime.unboxToLong((Object)truncateToCapture.getValue()));
    }

    @Test
    public void shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend mockBlockingSend = (BlockingSend)EasyMock.createMock(BlockingSend.class);
        mockBlockingSend.initiateClose();
        EasyMock.expect((Object)BoxedUnit.UNIT).andThrow((Throwable)new IllegalArgumentException()).once();
        mockBlockingSend.close();
        EasyMock.expect((Object)BoxedUnit.UNIT).andThrow((Throwable)new IllegalStateException()).once();
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.mock(ReplicaManager.class);
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn(EasyMock.mock(BrokerTopicStats.class));
        EasyMock.replay((Object[])new Object[]{mockBlockingSend, replicaManager});
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), config, this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), null, (Option)new Some((Object)mockBlockingSend));
        thread.start();
        thread.initiateShutdown();
        thread.awaitShutdown();
        EasyMock.verify((Object[])new Object[]{mockBlockingSend});
    }

    @Test
    public void shouldUpdateReassignmentBytesInMetrics() {
        this.assertProcessPartitionDataWhen(true);
    }

    @Test
    public void shouldNotUpdateReassignmentBytesInMetricsWhenNoReassignmentsInProgress() {
        this.assertProcessPartitionDataWhen(false);
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition tp, int leaderEpoch, long endOffset) {
        return this.newOffsetForLeaderPartitionResult(tp, Errors.NONE, leaderEpoch, endOffset);
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition tp, Errors error, int leaderEpoch, long endOffset) {
        return new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(tp.partition()).setErrorCode(error.code()).setLeaderEpoch(leaderEpoch).setEndOffset(endOffset);
    }

    /*
     * WARNING - void declaration
     */
    private void assertProcessPartitionDataWhen(boolean isReassigning) {
        MemoryRecords memoryRecords;
        MemoryRecords memoryRecords2;
        void withRecords_records;
        void withRecords_timestampType;
        MemoryRecords memoryRecords3;
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend mockBlockingSend = (BlockingSend)EasyMock.createNiceMock(BlockingSend.class);
        Log log = (Log)EasyMock.createNiceMock(Log.class);
        Partition partition = (Partition)EasyMock.createNiceMock(Partition.class);
        EasyMock.expect((Object)partition.localLogOrException()).andReturn((Object)log);
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)partition.isReassigning())).andReturn((Object)BoxesRunTime.boxToBoolean((boolean)isReassigning));
        EasyMock.expect((Object)BoxesRunTime.boxToBoolean((boolean)partition.isAddingLocalReplica())).andReturn((Object)BoxesRunTime.boxToBoolean((boolean)isReassigning));
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createNiceMock(ReplicaManager.class);
        EasyMock.expect((Object)replicaManager.getPartitionOrException((TopicPartition)EasyMock.anyObject())).andReturn((Object)partition);
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        EasyMock.expect((Object)replicaManager.brokerTopicStats()).andReturn((Object)brokerTopicStats).anyTimes();
        ReplicaQuota replicaQuota = (ReplicaQuota)EasyMock.createNiceMock(ReplicaQuota.class);
        EasyMock.replay((Object[])new Object[]{mockBlockingSend, replicaManager, partition, log, replicaQuota});
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.kafka$server$ReplicaFetcherThreadTest$$brokerEndPoint(), config, this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), replicaQuota, (Option)new Some((Object)mockBlockingSend));
        SimpleRecord[] simpleRecordArray = new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))};
        CompressionType withRecords_compressionType = CompressionType.NONE;
        byte withRecords_magic = 2;
        TimestampType timestampType = TimestampType.CREATE_TIME;
        long withRecords_initialOffset = 0L;
        MemoryRecords memoryRecords4 = memoryRecords3 = MemoryRecords.withRecords((byte)withRecords_magic, (long)withRecords_initialOffset, (CompressionType)withRecords_compressionType, (TimestampType)withRecords_timestampType, (long)-1L, (short)-1, (int)-1, (int)-1, (boolean)false, (SimpleRecord[])withRecords_records);
        timestampType = null;
        memoryRecords3 = null;
        MemoryRecords memoryRecords5 = memoryRecords2 = memoryRecords4;
        memoryRecords2 = null;
        MemoryRecords memoryRecords6 = memoryRecords = memoryRecords5;
        Object var13_12 = null;
        simpleRecordArray = null;
        memoryRecords = null;
        MemoryRecords records = memoryRecords6;
        FetchResponse.PartitionData partitionData = new FetchResponse.PartitionData(Errors.NONE, 0L, 0L, 0L, Optional.empty(), Collections.emptyList(), (BaseRecords)records);
        thread.processPartitionData(this.t1p0(), 0L, partitionData);
        if (isReassigning) {
            Assertions.assertEquals((long)records.sizeInBytes(), (long)((Meter)brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        } else {
            Assertions.assertEquals((long)0L, (long)((Meter)brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        }
        Assertions.assertEquals((long)records.sizeInBytes(), (long)((Meter)brokerTopicStats.allTopicsStats().replicationBytesInRate().get()).count());
    }

    public void stub(Partition partition, ReplicaManager replicaManager, Log log) {
        EasyMock.expect((Object)replicaManager.localLogOrException(this.t1p0())).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0())).andReturn((Object)partition).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException(this.t1p1())).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p1())).andReturn((Object)partition).anyTimes();
        EasyMock.expect((Object)replicaManager.localLogOrException(this.t2p1())).andReturn((Object)log).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t2p1())).andReturn((Object)partition).anyTimes();
    }

    private KafkaConfig kafkaConfigNoTruncateOnFetch() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1);
        props.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), KAFKA_2_6_IV0$.MODULE$.version());
        return KafkaConfig$.MODULE$.fromProps(props);
    }

    public static final /* synthetic */ void $anonfun$assertPartitionStates$1(AbstractFetcherThread fetcher$1, boolean shouldBeReadyForFetch$1, boolean shouldBeTruncatingLog$1, boolean shouldBeDelayed$1, TopicPartition tp) {
        Assertions.assertTrue((boolean)fetcher$1.fetchState(tp).isDefined());
        PartitionFetchState fetchState = (PartitionFetchState)fetcher$1.fetchState(tp).get();
        Assertions.assertEquals((Object)BoxesRunTime.boxToBoolean((boolean)shouldBeReadyForFetch$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isReadyForFetch()), (String)new StringBuilder(39).append("Partition ").append(tp).append(" should").append((Object)(!shouldBeReadyForFetch$1 ? " NOT" : "")).append(" be ready for fetching").toString());
        Assertions.assertEquals((Object)BoxesRunTime.boxToBoolean((boolean)shouldBeTruncatingLog$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isTruncating()), (String)new StringBuilder(39).append("Partition ").append(tp).append(" should").append((Object)(!shouldBeTruncatingLog$1 ? " NOT" : "")).append(" be truncating its log").toString());
        Assertions.assertEquals((Object)BoxesRunTime.boxToBoolean((boolean)shouldBeDelayed$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isDelayed()), (String)new StringBuilder(28).append("Partition ").append(tp).append(" should").append((Object)(!shouldBeDelayed$1 ? " NOT" : "")).append(" be delayed").toString());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$2(ReplicaFetcherThread thread$1, TopicPartition tp) {
        Assertions.assertEquals((Object)Fetching$.MODULE$, (Object)((PartitionFetchState)thread$1.fetchState(tp).get()).state());
    }

    private static final FetchResponse.PartitionData partitionData$1(FetchResponseData.EpochEndOffset divergingEpoch) {
        return new FetchResponse.PartitionData(Errors.NONE, 0L, 0L, 0L, Optional.empty(), Collections.emptyList(), Optional.of(divergingEpoch), (BaseRecords)MemoryRecords.EMPTY);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$3(ReplicaFetcherThread thread$1, TopicPartition tp) {
        Assertions.assertEquals((Object)Fetching$.MODULE$, (Object)((PartitionFetchState)thread$1.fetchState(tp).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(ReplicaFetcherThread thread$1, TopicPartition tp) {
        Assertions.assertEquals((Object)Fetching$.MODULE$, (Object)((PartitionFetchState)thread$1.fetchState(tp).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(ReplicaFetcherThread thread$1, TopicPartition tp) {
        Assertions.assertEquals((Object)Fetching$.MODULE$, (Object)((PartitionFetchState)thread$1.fetchState(tp).get()).state());
    }
}

