/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.util.Optional;
import java.util.Properties;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.cluster.Replica;
import kafka.log.LogManager;
import kafka.server.AbstractFetcherThread;
import kafka.server.BlockingSend;
import kafka.server.Fetching$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.LogOffsetMetadata;
import kafka.server.LogOffsetMetadata$;
import kafka.server.OffsetAndEpoch;
import kafka.server.PartitionFetchState;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaAlterLogDirsManager;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.ReplicationQuotaManager;
import kafka.server.Truncating$;
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.Assert;
import org.junit.Test;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.Set;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005Ud\u0001\u0002\u0010 \u0001\u0011BQa\u000b\u0001\u0005\u00021Bqa\f\u0001C\u0002\u0013%\u0001\u0007\u0003\u0004=\u0001\u0001\u0006I!\r\u0005\b{\u0001\u0011\r\u0011\"\u00031\u0011\u0019q\u0004\u0001)A\u0005c!9q\b\u0001b\u0001\n\u0013\u0001\u0004B\u0002!\u0001A\u0003%\u0011\u0007C\u0004B\u0001\u0001\u0007I\u0011\u0002\"\t\u000f\u0019\u0003\u0001\u0019!C\u0005\u000f\"1Q\n\u0001Q!\n\rCqA\u0014\u0001C\u0002\u0013%q\n\u0003\u0004W\u0001\u0001\u0006I\u0001\u0015\u0005\u0006/\u0002!I\u0001\u0017\u0005\bM\u0002\t\n\u0011\"\u0003h\u0011\u0015\u0011\b\u0001\"\u0001t\u0011\u0015Y\b\u0001\"\u0001t\u0011\u0015i\b\u0001\"\u0001\u007f\u0011\u0019\t)\u0002\u0001C\u0001g\"1\u0011\u0011\u0004\u0001\u0005\u0002MDa!!\b\u0001\t\u0003\u0019\bBBA\u0011\u0001\u0011\u00051\u000f\u0003\u0004\u0002&\u0001!\ta\u001d\u0005\u0007\u0003S\u0001A\u0011A:\t\r\u00055\u0002\u0001\"\u0001t\u0011\u0019\t\t\u0004\u0001C\u0001g\"1\u0011Q\u0007\u0001\u0005\u0002MDa!!\u000f\u0001\t\u0003\u0019\bBBA\u001f\u0001\u0011\u00051\u000fC\u0004\u0002B\u0001!\t!a\u0011\u00031I+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1e)\u0016\u001cHO\u0003\u0002!C\u000511/\u001a:wKJT\u0011AI\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001Q\u0005\u0005\u0002'S5\tqEC\u0001)\u0003\u0015\u00198-\u00197b\u0013\tQsE\u0001\u0004B]f\u0014VMZ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u00035\u0002\"A\f\u0001\u000e\u0003}\tA\u0001^\u0019qaU\t\u0011\u0007\u0005\u00023u5\t1G\u0003\u00025k\u000511m\\7n_:T!A\t\u001c\u000b\u0005]B\u0014AB1qC\u000eDWMC\u0001:\u0003\ry'oZ\u0005\u0003wM\u0012a\u0002V8qS\u000e\u0004\u0016M\u001d;ji&|g.A\u0003ucA\u0004\u0004%\u0001\u0003ucA\f\u0014!\u0002;2aF\u0002\u0013\u0001\u0002;3aF\nQ\u0001\u001e\u001aqc\u0001\na\u0001^8GC&dW#A\"\u0011\u0005\u0019\"\u0015BA#(\u0005\u001d\u0011un\u001c7fC:\f!\u0002^8GC&dw\fJ3r)\tA5\n\u0005\u0002'\u0013&\u0011!j\n\u0002\u0005+:LG\u000fC\u0004M\u0013\u0005\u0005\t\u0019A\"\u0002\u0007a$\u0013'A\u0004u_\u001a\u000b\u0017\u000e\u001c\u0011\u0002\u001d\t\u0014xn[3s\u000b:$\u0007k\\5oiV\t\u0001\u000b\u0005\u0002R)6\t!K\u0003\u0002TC\u000591\r\\;ti\u0016\u0014\u0018BA+S\u00059\u0011%o\\6fe\u0016sG\rU8j]R\fqB\u0019:pW\u0016\u0014XI\u001c3Q_&tG\u000fI\u0001\u000f_\u001a47/\u001a;B]\u0012,\u0005o\\2i)\rIF,\u0019\t\u0003]iK!aW\u0010\u0003\u001d=3gm]3u\u0003:$W\t]8dQ\")Q,\u0004a\u0001=\u0006Ya-\u001a;dQ>3gm]3u!\t1s,\u0003\u0002aO\t!Aj\u001c8h\u0011\u001d\u0011W\u0002%AA\u0002\r\f1\u0002\\3bI\u0016\u0014X\t]8dQB\u0011a\u0005Z\u0005\u0003K\u001e\u00121!\u00138u\u0003aygMZ:fi\u0006sG-\u00129pG\"$C-\u001a4bk2$HEM\u000b\u0002Q*\u00121-[\u0016\u0002UB\u00111\u000e]\u0007\u0002Y*\u0011QN\\\u0001\nk:\u001c\u0007.Z2lK\u0012T!a\\\u0014\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0002rY\n\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\u0002QMDw.\u001e7e'\u0016tG\rT1uKN$(+Z9vKN$h+\u001a:tS>t7OQ=EK\u001a\fW\u000f\u001c;\u0015\u0003!C#aD;\u0011\u0005YLX\"A<\u000b\u0005aD\u0014!\u00026v]&$\u0018B\u0001>x\u0005\u0011!Vm\u001d;\u0002\u0001NDw.\u001e7e\r\u0016$8\r\u001b'fC\u0012,'/\u00129pG\"\u0014V-];fgRLe\rT1ti\u0016\u0003xn\u00195EK\u001aLg.\u001a3G_J\u001cv.\\3QCJ$\u0018\u000e^5p]ND#\u0001E;\u0002+\u0005\u001c8/\u001a:u!\u0006\u0014H/\u001b;j_:\u001cF/\u0019;fgRA\u0001j`A\u0005\u0003\u001b\t\t\u0002C\u0004\u0002\u0002E\u0001\r!a\u0001\u0002\u000f\u0019,Go\u00195feB\u0019a&!\u0002\n\u0007\u0005\u001dqDA\u000bBEN$(/Y2u\r\u0016$8\r[3s)\"\u0014X-\u00193\t\r\u0005-\u0011\u00031\u0001D\u0003U\u0019\bn\\;mI\n+'+Z1es\u001a{'OR3uG\"Da!a\u0004\u0012\u0001\u0004\u0019\u0015!F:i_VdGMQ3UeVt7-\u0019;j]\u001edun\u001a\u0005\u0007\u0003'\t\u0002\u0019A\"\u0002\u001fMDw.\u001e7e\u0005\u0016$U\r\\1zK\u0012\fQe\u001d5pk2$\u0007*\u00198eY\u0016,\u0005pY3qi&|gN\u0012:p[\ncwnY6j]\u001e\u001cVM\u001c3)\u0005I)\u0018AP:i_VdGMR3uG\"dU-\u00193fe\u0016\u0003xn\u00195P]\u001aK'o\u001d;GKR\u001c\u0007n\u00148ms&3G*Z1eKJ,\u0005o\\2i\u0017:|wO\u001c+p\u0005>$\b\u000e\u000b\u0002\u0014k\u0006!4\u000f[8vY\u0012$&/\u001e8dCR,Gk\\(gMN,Go\u00159fG&4\u0017.\u001a3J]\u0016\u0003xn\u00195PM\u001a\u001cX\r\u001e*fgB|gn]3)\u0005Q)\u0018!T:i_VdG\r\u0016:v]\u000e\fG/\u001a+p\u001f\u001a47/\u001a;Ta\u0016\u001c\u0017NZ5fI&sW\t]8dQ>3gm]3u%\u0016\u001c\bo\u001c8tK&3gi\u001c7m_^,'\u000fS1t\u001d>luN]3Fa>\u001c\u0007n\u001d\u0015\u0003+U\f!j\u001d5pk2$g)\u001a;dQ2+\u0017\rZ3s\u000bB|7\r[*fG>tG\rV5nK&3G*Z1eKJ\u0014V\r\u001d7jKN<\u0016\u000e\u001e5Fa>\u001c\u0007NT8u\u0017:|wO\u001c+p\r>dGn\\<fe\"\u0012a#^\u00014g\"|W\u000f\u001c3Vg\u0016dU-\u00193fe\u0016sGm\u00144gg\u0016$\u0018JZ%oi\u0016\u0014(I]8lKJ4VM]:j_:\u0014U\r\\8xeAB#aF;\u0002\u0001NDw.\u001e7e)J,hnY1uKR{\u0017J\\5uS\u0006dg)\u001a;dQ>3gm]3u\u0013\u001adU-\u00193feJ+G/\u001e:ogVsG-\u001a4j]\u0016$wJ\u001a4tKRD#\u0001G;\u0002cMDw.\u001e7e!>dG.\u00138eK\u001aLg.\u001b;fYfLe\rT3bI\u0016\u0014(+\u001a;ve:\u001c\u0018I\\=Fq\u000e,\u0007\u000f^5p]\"\u0012\u0011$^\u0001,g\"|W\u000f\u001c3N_Z,\u0007+\u0019:uSRLwN\\:PkR|e\r\u0016:v]\u000e\fG/\u001b8h\u0019><7\u000b^1uK\"\u0012!$^\u00019g\"|W\u000f\u001c3GS2$XM\u001d)beRLG/[8og6\u000bG-\u001a'fC\u0012,'\u000fR;sS:<G*Z1eKJ,\u0005o\\2i%\u0016\fX/Z:uQ\tYR/\u0001%tQ>,H\u000eZ\"bi\u000eDW\t_2faRLwN\u001c$s_6\u0014En\\2lS:<7+\u001a8e/\",gn\u00155viRLgn\u001a#po:\u0014V\r\u001d7jG\u00064U\r^2iKJ$\u0006N]3bI\"\u0012A$^\u0001\u0005gR,(\r\u0006\u0005\u0002F\u0005u\u0013qMA6!\u0019\t9%!\u0014\u0002R5\u0011\u0011\u0011\n\u0006\u0004\u0003\u0017B\u0014\u0001C3bgflwnY6\n\t\u0005=\u0013\u0011\n\u0002\u0014\u0013\u0016C\b/Z2uCRLwN\\*fiR,'o\u001d\t\u0006M\u0005M\u0013qK\u0005\u0004\u0003+:#AB(qi&|g\u000eE\u0002R\u00033J1!a\u0017S\u0005%\u0001\u0016M\u001d;ji&|g\u000eC\u0004\u0002`u\u0001\r!!\u0019\u0002\u000fI,\u0007\u000f\\5dCB\u0019\u0011+a\u0019\n\u0007\u0005\u0015$KA\u0004SKBd\u0017nY1\t\u000f\u0005%T\u00041\u0001\u0002X\u0005I\u0001/\u0019:uSRLwN\u001c\u0005\b\u0003[j\u0002\u0019AA8\u00039\u0011X\r\u001d7jG\u0006l\u0015M\\1hKJ\u00042ALA9\u0013\r\t\u0019h\b\u0002\u000f%\u0016\u0004H.[2b\u001b\u0006t\u0017mZ3s\u0001")
public class ReplicaFetcherThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private boolean toFail = false;
    private final BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    private boolean toFail() {
        return this.toFail;
    }

    private void toFail_$eq(boolean x$1) {
        this.toFail = x$1;
    }

    private BrokerEndPoint brokerEndPoint() {
        return this.brokerEndPoint;
    }

    private OffsetAndEpoch offsetAndEpoch(long fetchOffset, int leaderEpoch) {
        return new OffsetAndEpoch(fetchOffset, leaderEpoch);
    }

    private int offsetAndEpoch$default$2() {
        return 1;
    }

    @Test
    public void shouldSendLatestRequestVersionsByDefault() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, null, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)QuotaFactory.UnboundedQuota$.MODULE$, (Option)None$.MODULE$);
        Assert.assertEquals((long)ApiKeys.FETCH.latestVersion(), (long)thread.fetchRequestVersion());
        Assert.assertEquals((long)ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), (long)thread.offsetForLeaderEpochRequestVersion());
        Assert.assertEquals((long)ApiKeys.LIST_OFFSETS.latestVersion(), (long)thread.listOffsetRequestVersion());
    }

    @Test
    public void shouldFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)replica.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata(0L, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).once();
        EasyMock.expect((Object)replica.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).once();
        EasyMock.expect((Object)replica.latestEpoch()).andReturn((Object)None$.MODULE$).once();
        EasyMock.expect((Object)replica.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, replica});
        java.util.Map offsets = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        this.assertPartitionStates((AbstractFetcherThread)thread, false, true, false);
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)1L, (long)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)2L, (long)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)3L, (long)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        EasyMock.verify((Object[])new Object[]{logManager});
    }

    public void assertPartitionStates(AbstractFetcherThread fetcher, boolean shouldBeReadyForFetch, boolean shouldBeTruncatingLog, boolean shouldBeDelayed) {
        new .colon.colon((Object)this.t1p0(), (List)new .colon.colon((Object)this.t1p1(), (List)new .colon.colon((Object)this.t2p1(), (List)Nil$.MODULE$))).foreach((Function1 & Serializable & scala.Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$assertPartitionStates$1(fetcher, shouldBeReadyForFetch, shouldBeTruncatingLog, shouldBeDelayed, tp);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend mockBlockingSend = (BlockingSend)EasyMock.createMock(BlockingSend.class);
        EasyMock.expect((Object)mockBlockingSend.sendRequest((AbstractRequest.Builder)EasyMock.anyObject())).andThrow((Throwable)new NullPointerException()).once();
        EasyMock.replay((Object[])new Object[]{mockBlockingSend});
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, null, new Metrics(), (Time)new SystemTime(), null, (Option)new Some((Object)mockBlockingSend));
        Map result = thread.fetchEpochEndOffsets((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 0))})));
        Map expected = (Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}));
        Assert.assertEquals((String)"results from leader epoch request should have undefined offset", (Object)expected, (Object)result);
        EasyMock.verify((Object[])new Object[]{mockBlockingSend});
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBoth() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)replica.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata(0L, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replica.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        partition.truncateTo(EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, replica});
        java.util.Map offsets = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)QuotaFactory.UnboundedQuota$.MODULE$, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)1L, (long)mockNetwork.fetchCount());
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)2L, (long)mockNetwork.fetchCount());
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)3L, (long)mockNetwork.fetchCount());
        EasyMock.verify((Object[])new Object[]{logManager});
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponse() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Seq configs = (Seq)TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14()).map((Function1 & Serializable & scala.Serializable)props -> KafkaConfig$.MODULE$.fromProps(props), Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)replica.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata((long)(initialLEO - 1), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replica.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLEO, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, replica, partition});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(leaderEpoch, 156L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)new EpochEndOffset(leaderEpoch, 172L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), (KafkaConfig)configs.apply(0), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertTrue((String)new StringBuilder(58).append("Expected ").append(this.t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)156)));
        Assert.assertTrue((String)new StringBuilder(58).append("Expected ").append(this.t2p1()).append(" to truncate to offset 172 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)172)));
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Seq configs = (Seq)TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14()).map((Function1 & Serializable & scala.Serializable)props -> KafkaConfig$.MODULE$.fromProps(props), Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpochAtFollower = 5;
        int leaderEpochAtLeader = 4;
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)replica.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata((long)(initialLEO - 3), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpochAtFollower))).anyTimes();
        EasyMock.expect((Object)replica.endOffsetForEpoch(leaderEpochAtLeader)).andReturn((Object)None$.MODULE$).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, replica, partition});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(leaderEpochAtLeader, 156L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)new EpochEndOffset(leaderEpochAtLeader, 202L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), (KafkaConfig)configs.apply(0), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertTrue((String)new StringBuilder(58).append("Expected ").append(this.t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)156)));
        Assert.assertTrue((String)new StringBuilder(55).append("Expected ").append(this.t2p1()).append(" to truncate to offset ").append(initialLEO).append(" (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)initialLEO)));
    }

    @Test
    public void shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)replica.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata((long)(initialLEO - 2), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5))).anyTimes();
        EasyMock.expect((Object)replica.endOffsetForEpoch(4)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)replica.endOffsetForEpoch(3)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, replica, partition});
        java.util.Map offsets = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(4, 155L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(4, 143L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)0L, (long)mockNetwork.fetchCount());
        java.util.Map nextOffsets = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(3, 101L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(3, 102L))}))).asJava();
        mockNetwork.setOffsetsForNextResponse(nextOffsets);
        thread.doWork();
        Assert.assertEquals((long)2L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)1L, (long)mockNetwork.fetchCount());
        Assert.assertEquals((String)"OffsetsForLeaderEpochRequest version.", (long)2L, (long)mockNetwork.lastUsedOffsetForLeaderEpochVersion());
        thread.doWork();
        Assert.assertEquals((long)2L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)2L, (long)mockNetwork.fetchCount());
        Assert.assertTrue((String)new StringBuilder(58).append("Expected ").append(this.t1p1()).append(" to truncate to offset 102 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)102)));
        Assert.assertTrue((String)new StringBuilder(58).append("Expected ").append(this.t1p0()).append(" to truncate to offset 101 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)101)));
    }

    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18());
        props.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialLEO = 200;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)replica.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata((long)(initialLEO - 2), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5))).anyTimes();
        EasyMock.expect((Object)replica.endOffsetForEpoch(4)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)replica.endOffsetForEpoch(3)).andReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, replica, partition});
        java.util.Map offsets = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(-1, 155L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(-1, 143L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)1L, (long)mockNetwork.fetchCount());
        Assert.assertEquals((String)"OffsetsForLeaderEpochRequest version.", (long)0L, (long)mockNetwork.lastUsedOffsetForLeaderEpochVersion());
        thread.doWork();
        Assert.assertEquals((long)1L, (long)mockNetwork.epochFetchCount());
        Assert.assertEquals((long)2L, (long)mockNetwork.fetchCount());
        Assert.assertTrue((String)new StringBuilder(58).append("Expected ").append(this.t1p0()).append(" to truncate to offset 155 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)155)));
        Assert.assertTrue((String)new StringBuilder(58).append("Expected ").append(this.t1p1()).append(" to truncate to offset 143 (truncation offsets: ").append(truncateToCapture.getValues()).append(")").toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)143)));
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfLeaderReturnsUndefinedOffset() {
        Capture truncated = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Seq configs = (Seq)TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14()).map((Function1 & Serializable & scala.Serializable)props -> KafkaConfig$.MODULE$.fromProps(props), Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int initialFetchOffset = 100;
        int initialLeo = 300;
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncated)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)replica.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLeo)).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata((long)initialFetchOffset, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5)));
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, replica, partition});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(-1, -1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), (KafkaConfig)configs.apply(0), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(initialFetchOffset, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertEquals((long)initialFetchOffset, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldPollIndefinitelyIfLeaderReturnsAnyException() {
        Capture truncated = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Seq configs = (Seq)TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14()).map((Function1 & Serializable & scala.Serializable)props -> KafkaConfig$.MODULE$.fromProps(props), Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 5;
        int highWaterMark = 100;
        int initialLeo = 300;
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata((long)highWaterMark, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncated)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)replica.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLeo)).anyTimes();
        EasyMock.expect((Object)replica.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replica.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLeo, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, replica, partition});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mutableMapAsJavaMapConverter((scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, -1, -1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), (KafkaConfig)configs.apply(0), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)x$1 -> thread.doWork());
        Assert.assertEquals((long)0L, (long)truncated.getValues().size());
        offsetsReply.put(this.t1p0(), new EpochEndOffset(leaderEpoch, 156L));
        thread.doWork();
        Assert.assertEquals((long)156L, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldMovePartitionsOutOfTruncatingLogState() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createNiceMock(ReplicaManager.class);
        int leaderEpoch = 4;
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)replica.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)0L)).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata(0L, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replica.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, replica});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        Assert.assertEquals((Object)Option$.MODULE$.apply((Object)Truncating$.MODULE$), (Object)thread.fetchState(this.t1p0()).map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.state()));
        Assert.assertEquals((Object)Option$.MODULE$.apply((Object)Truncating$.MODULE$), (Object)thread.fetchState(this.t1p1()).map((Function1 & Serializable & scala.Serializable)x$3 -> x$3.state()));
        thread.doWork();
        Assert.assertEquals((Object)Option$.MODULE$.apply((Object)Fetching$.MODULE$), (Object)thread.fetchState(this.t1p0()).map((Function1 & Serializable & scala.Serializable)x$4 -> x$4.state()));
        Assert.assertEquals((Object)Option$.MODULE$.apply((Object)Fetching$.MODULE$), (Object)thread.fetchState(this.t1p1()).map((Function1 & Serializable & scala.Serializable)x$5 -> x$5.state()));
    }

    @Test
    public void shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        int initialLEO = 100;
        ReplicationQuotaManager quota = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createNiceMock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)EasyMock.createMock(ReplicaAlterLogDirsManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createNiceMock(ReplicaManager.class);
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)replica.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)initialLEO)).anyTimes();
        EasyMock.expect((Object)replica.highWatermark()).andReturn((Object)new LogOffsetMetadata((long)(initialLEO - 2), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect((Object)replica.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5))).anyTimes();
        EasyMock.expect((Object)replica.endOffsetForEpoch(5)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLEO, 5))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)replicaManager.replicaAlterLogDirsManager()).andReturn((Object)replicaAlterLogDirsManager).anyTimes();
        this.stub(replica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quota, replica, partition});
        java.util.Map offsetsReply = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(5, 52L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(5, 49L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option)new Some((Object)mockNetwork));
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        TopicPartition partitionThatBecameLeader = this.t1p0();
        mockNetwork.setEpochRequestCallback((Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> thread.removePartitions((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{partitionThatBecameLeader}))));
        thread.doWork();
        Assert.assertEquals((long)49L, (long)BoxesRunTime.unboxToLong((Object)truncateToCapture.getValue()));
    }

    @Test
    public void shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend mockBlockingSend = (BlockingSend)EasyMock.createMock(BlockingSend.class);
        mockBlockingSend.initiateClose();
        EasyMock.expect((Object)BoxedUnit.UNIT).andThrow((Throwable)new IllegalArgumentException()).once();
        mockBlockingSend.close();
        EasyMock.expect((Object)BoxedUnit.UNIT).andThrow((Throwable)new IllegalStateException()).once();
        EasyMock.replay((Object[])new Object[]{mockBlockingSend});
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, null, new Metrics(), (Time)new SystemTime(), null, (Option)new Some((Object)mockBlockingSend));
        thread.start();
        thread.initiateShutdown();
        thread.awaitShutdown();
        EasyMock.verify((Object[])new Object[]{mockBlockingSend});
    }

    public IExpectationSetters<Option<Partition>> stub(Replica replica, Partition partition, ReplicaManager replicaManager) {
        EasyMock.expect((Object)replicaManager.localReplica(this.t1p0())).andReturn((Object)new Some((Object)replica)).anyTimes();
        EasyMock.expect((Object)replicaManager.localReplicaOrException(this.t1p0())).andReturn((Object)replica).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartition(this.t1p0())).andReturn((Object)new Some((Object)partition)).anyTimes();
        EasyMock.expect((Object)replicaManager.localReplica(this.t1p1())).andReturn((Object)new Some((Object)replica)).anyTimes();
        EasyMock.expect((Object)replicaManager.localReplicaOrException(this.t1p1())).andReturn((Object)replica).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartition(this.t1p1())).andReturn((Object)new Some((Object)partition)).anyTimes();
        EasyMock.expect((Object)replicaManager.localReplica(this.t2p1())).andReturn((Object)new Some((Object)replica)).anyTimes();
        EasyMock.expect((Object)replicaManager.localReplicaOrException(this.t2p1())).andReturn((Object)replica).anyTimes();
        return EasyMock.expect((Object)replicaManager.getPartition(this.t2p1())).andReturn((Object)new Some((Object)partition)).anyTimes();
    }

    public static final /* synthetic */ void $anonfun$assertPartitionStates$1(AbstractFetcherThread fetcher$1, boolean shouldBeReadyForFetch$1, boolean shouldBeTruncatingLog$1, boolean shouldBeDelayed$1, TopicPartition tp) {
        Assert.assertTrue((boolean)fetcher$1.fetchState(tp).isDefined());
        PartitionFetchState fetchState = (PartitionFetchState)fetcher$1.fetchState(tp).get();
        Assert.assertEquals((String)new StringBuilder(39).append("Partition ").append(tp).append(" should").append((Object)(!shouldBeReadyForFetch$1 ? " NOT" : "")).append(" be ready for fetching").toString(), (Object)BoxesRunTime.boxToBoolean((boolean)shouldBeReadyForFetch$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isReadyForFetch()));
        Assert.assertEquals((String)new StringBuilder(39).append("Partition ").append(tp).append(" should").append((Object)(!shouldBeTruncatingLog$1 ? " NOT" : "")).append(" be truncating its log").toString(), (Object)BoxesRunTime.boxToBoolean((boolean)shouldBeTruncatingLog$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isTruncating()));
        Assert.assertEquals((String)new StringBuilder(28).append("Partition ").append(tp).append(" should").append((Object)(!shouldBeDelayed$1 ? " NOT" : "")).append(" be delayed").toString(), (Object)BoxesRunTime.boxToBoolean((boolean)shouldBeDelayed$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isDelayed()));
    }
}

