/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.util.Optional;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.cluster.Replica;
import kafka.log.LogManager;
import kafka.server.AbstractFetcherThread;
import kafka.server.FailedPartitions;
import kafka.server.FetchPartitionData;
import kafka.server.Fetching$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.OffsetAndEpoch;
import kafka.server.PartitionFetchState;
import kafka.server.PartitionFetchState$;
import kafka.server.ReplicaAlterLogDirsThread;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.ReplicaState;
import kafka.server.ReplicationQuotaManager;
import kafka.server.Truncating$;
import kafka.utils.DelayedItem;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
import org.junit.Assert;
import org.junit.Test;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.Set;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0005d\u0001B\u0001\u0003\u0001\u001d\u0011QDU3qY&\u001c\u0017-\u00117uKJdun\u001a#jeN$\u0006N]3bIR+7\u000f\u001e\u0006\u0003\u0007\u0011\taa]3sm\u0016\u0014(\"A\u0003\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0003\t\u0003\u00131i\u0011A\u0003\u0006\u0002\u0017\u0005)1oY1mC&\u0011QB\u0003\u0002\u0007\u0003:L(+\u001a4\t\u000b=\u0001A\u0011\u0001\t\u0002\rqJg.\u001b;?)\u0005\t\u0002C\u0001\n\u0001\u001b\u0005\u0011\u0001b\u0002\u000b\u0001\u0005\u0004%I!F\u0001\u0005iF\u0002\b'F\u0001\u0017!\t9r$D\u0001\u0019\u0015\tI\"$\u0001\u0004d_6lwN\u001c\u0006\u0003\u000bmQ!\u0001H\u000f\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005q\u0012aA8sO&\u0011\u0001\u0005\u0007\u0002\u000f)>\u0004\u0018n\u0019)beRLG/[8o\u0011\u0019\u0011\u0003\u0001)A\u0005-\u0005)A/\r91A!9A\u0005\u0001b\u0001\n\u0013)\u0012\u0001\u0002;2aFBaA\n\u0001!\u0002\u00131\u0012!\u0002;2aF\u0002\u0003b\u0002\u0015\u0001\u0005\u0004%I!K\u0001\u0011M\u0006LG.\u001a3QCJ$\u0018\u000e^5p]N,\u0012A\u000b\t\u0003%-J!\u0001\f\u0002\u0003!\u0019\u000b\u0017\u000e\\3e!\u0006\u0014H/\u001b;j_:\u001c\bB\u0002\u0018\u0001A\u0003%!&A\tgC&dW\r\u001a)beRLG/[8og\u0002BQ\u0001\r\u0001\u0005\nE\nab\u001c4gg\u0016$\u0018I\u001c3Fa>\u001c\u0007\u000eF\u00023ki\u0002\"AE\u001a\n\u0005Q\u0012!AD(gMN,G/\u00118e\u000bB|7\r\u001b\u0005\u0006m=\u0002\raN\u0001\fM\u0016$8\r[(gMN,G\u000f\u0005\u0002\nq%\u0011\u0011H\u0003\u0002\u0005\u0019>tw\rC\u0004<_A\u0005\t\u0019\u0001\u001f\u0002\u00171,\u0017\rZ3s\u000bB|7\r\u001b\t\u0003\u0013uJ!A\u0010\u0006\u0003\u0007%sG\u000fC\u0003A\u0001\u0011\u0005\u0011)\u0001\u0012jgN,Xm]#q_\u000eD'+Z9vKN$hI]8n\u0019>\u001c\u0017\r\u001c*fa2L7-\u0019\u000b\u0002\u0005B\u0011\u0011bQ\u0005\u0003\t*\u0011A!\u00168ji\"\u0012qH\u0012\t\u0003\u000f*k\u0011\u0001\u0013\u0006\u0003\u0013v\tQA[;oSRL!a\u0013%\u0003\tQ+7\u000f\u001e\u0005\u0006\u001b\u0002!\t!Q\u0001>M\u0016$8\r[#q_\u000eD7O\u0012:p[2+\u0017\rZ3s'\"|W\u000f\u001c3IC:$G.Z#yG\u0016\u0004H/[8o\rJ|WnR3u\u0019>\u001c\u0017\r\u001c*fa2L7-\u0019\u0015\u0003\u0019\u001aCQ\u0001\u0015\u0001\u0005\u0002\u0005\u000bQd\u001d5pk2$GK];oG\u0006$X\rV8SKBd\u0017nY1PM\u001a\u001cX\r\u001e\u0015\u0003\u001f\u001aCQa\u0015\u0001\u0005\u0002\u0005\u000bQf\u001d5pk2$GK];oG\u0006$X\rV8F]\u0012|eMZ:fi>3G*\u0019:hKN$8i\\7n_:,\u0005o\\2iQ\t\u0011f\tC\u0003W\u0001\u0011\u0005\u0011)A!tQ>,H\u000e\u001a+sk:\u001c\u0017\r^3U_&s\u0017\u000e^5bY\u001a+Go\u00195PM\u001a\u001cX\r^%g%\u0016\u0004H.[2b%\u0016$XO\u001d8t+:$WMZ5oK\u0012|eMZ:fi\"\u0012QK\u0012\u0005\u00063\u0002!\t!Q\u0001,g\"|W\u000f\u001c3Q_2d\u0017J\u001c3fM&t\u0017\u000e^3ms&3'+\u001a9mS\u000e\fgj\u001c;Bm\u0006LG.\u00192mK\"\u0012\u0001L\u0012\u0005\u00069\u0002!\t!Q\u0001'g\"|W\u000f\u001c3GKR\u001c\u0007\u000eT3bI\u0016\u0014X\t]8dQ>sg)\u001b:ti\u001a+Go\u00195P]2L\bFA.G\u0011\u0015y\u0006\u0001\"\u0001B\u0003q\u0019\bn\\;mI\u001a+Go\u00195P]\u0016\u0014V\r\u001d7jG\u0006\fE/\u0011+j[\u0016D#A\u0018$\t\u000b\t\u0004A\u0011A!\u0002[MDw.\u001e7e\r\u0016$8\r\u001b(p]\u0012+G.Y=fI\u0006sGMT8o)J,hnY1uS:<'+\u001a9mS\u000e\f7\u000f\u000b\u0002b\r\")Q\r\u0001C\u0001M\u0006!1\u000f^;c)\u001d9go_?\u0000\u0003\u0007\u00012\u0001[6n\u001b\u0005I'B\u00016\u001e\u0003!)\u0017m]=n_\u000e\\\u0017B\u00017j\u0005MIU\t\u001f9fGR\fG/[8o'\u0016$H/\u001a:t!\rIa\u000e]\u0005\u0003_*\u0011aa\u00149uS>t\u0007CA9u\u001b\u0005\u0011(BA:\u0005\u0003\u001d\u0019G.^:uKJL!!\u001e:\u0003\u0013A\u000b'\u000f^5uS>t\u0007\"B<e\u0001\u0004A\u0018a\u0003:fa2L7-\u0019+2aB\u0002\"!]=\n\u0005i\u0014(a\u0002*fa2L7-\u0019\u0005\u0006y\u0012\u0004\r\u0001_\u0001\fe\u0016\u0004H.[2b)F\u0002\u0018\u0007C\u0003\u007fI\u0002\u0007\u00010A\u0007gkR,(/\u001a*fa2L7-\u0019\u0005\u0007\u0003\u0003!\u0007\u0019\u00019\u0002\u0013A\f'\u000f^5uS>t\u0007bBA\u0003I\u0002\u0007\u0011qA\u0001\u000fe\u0016\u0004H.[2b\u001b\u0006t\u0017mZ3s!\r\u0011\u0012\u0011B\u0005\u0004\u0003\u0017\u0011!A\u0004*fa2L7-Y'b]\u0006<WM\u001d\u0005\b\u0003\u001f\u0001A\u0011AA\t\u0003U\u0019H/\u001e2XSRDg)\u001a;dQ6+7o]1hKN$b\"a\u0005\u0002\u0016\u0005]\u0011\u0011DA\u000e\u0003;\ty\u0002E\u0002iW\nCaa^A\u0007\u0001\u0004A\bB\u0002?\u0002\u000e\u0001\u0007\u0001\u0010\u0003\u0004\u007f\u0003\u001b\u0001\r\u0001\u001f\u0005\b\u0003\u0003\ti\u00011\u0001q\u0011!\t)!!\u0004A\u0002\u0005\u001d\u0001\u0002CA\u0011\u0003\u001b\u0001\r!a\t\u0002!I,7\u000f]8og\u0016\u001c\u0015\r\u001c7cC\u000e\\\u0007#\u00025\u0002&\u0005%\u0012bAA\u0014S\n91)\u00199ukJ,\u0007CB\u0005\u0002,\u0005=\")C\u0002\u0002.)\u0011\u0011BR;oGRLwN\\\u0019\u0011\r\u0005E\u0012qGA\u001e\u001b\t\t\u0019DC\u0002\u00026)\t!bY8mY\u0016\u001cG/[8o\u0013\u0011\tI$a\r\u0003\u0007M+\u0017\u000f\u0005\u0004\n\u0003{1\u0012\u0011I\u0005\u0004\u0003\u007fQ!A\u0002+va2,'\u0007E\u0002\u0013\u0003\u0007J1!!\u0012\u0003\u0005I1U\r^2i!\u0006\u0014H/\u001b;j_:$\u0015\r^1\t\u0013\u0005%\u0003!%A\u0005\n\u0005-\u0013\u0001G8gMN,G/\u00118e\u000bB|7\r\u001b\u0013eK\u001a\fW\u000f\u001c;%eU\u0011\u0011Q\n\u0016\u0004y\u0005=3FAA)!\u0011\t\u0019&!\u0018\u000e\u0005\u0005U#\u0002BA,\u00033\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005m#\"\u0001\u0006b]:|G/\u0019;j_:LA!a\u0018\u0002V\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3")
public class ReplicaAlterLogDirsThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final FailedPartitions failedPartitions = new FailedPartitions();

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    private OffsetAndEpoch offsetAndEpoch(long fetchOffset, int leaderEpoch) {
        return new OffsetAndEpoch(fetchOffset, leaderEpoch);
    }

    private int offsetAndEpoch$default$2() {
        return 1;
    }

    @Test
    public void issuesEpochRequestFromLocalReplica() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        Partition partitionT1p0 = (Partition)EasyMock.createMock(Partition.class);
        Partition partitionT1p1 = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpochT1p0 = 2;
        int leaderEpochT1p1 = 5;
        int leoT1p0 = 13;
        int leoT1p1 = 232;
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).andStubReturn((Object)partitionT1p0);
        EasyMock.expect((Object)partitionT1p0.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpochT1p0, false)).andReturn((Object)new EpochEndOffset(leaderEpochT1p0, (long)leoT1p0)).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p1(), false)).andStubReturn((Object)partitionT1p1);
        EasyMock.expect((Object)partitionT1p1.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpochT1p1, false)).andReturn((Object)new EpochEndOffset(leaderEpochT1p1, (long)leoT1p1)).anyTimes();
        EasyMock.replay((Object[])new Object[]{partitionT1p0, partitionT1p1, replicaManager});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, null, null);
        Map result = thread.fetchEpochEndOffsets((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpochT1p0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpochT1p1))})));
        Map expected = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(Errors.NONE, leaderEpochT1p0, (long)leoT1p0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(Errors.NONE, leaderEpochT1p1, (long)leoT1p1))}));
        Assert.assertEquals((String)"results from leader epoch request should have offset from local replica", (Object)expected, (Object)result);
    }

    @Test
    public void fetchEpochsFromLeaderShouldHandleExceptionFromGetLocalReplica() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        Partition partitionT1p0 = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        int leaderEpoch = 2;
        int leo = 13;
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).andStubReturn((Object)partitionT1p0);
        EasyMock.expect((Object)partitionT1p0.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, false)).andReturn((Object)new EpochEndOffset(leaderEpoch, (long)leo)).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p1(), false)).andThrow((Throwable)new KafkaStorageException()).once();
        EasyMock.replay((Object[])new Object[]{partitionT1p0, replicaManager});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, null, null);
        Map result = thread.fetchEpochEndOffsets((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpoch)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpoch))})));
        Map expected = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new EpochEndOffset(Errors.NONE, leaderEpoch, (long)leo)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, -1, -1L))}));
        Assert.assertEquals((Object)expected, (Object)result);
    }

    @Test
    public void shouldTruncateToReplicaOffset() {
        Capture truncateCaptureT1p0 = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        Capture truncateCaptureT1p1 = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        Replica replicaT1p0 = (Replica)EasyMock.createNiceMock(Replica.class);
        Replica replicaT1p1 = (Replica)EasyMock.createNiceMock(Replica.class);
        Replica futureReplicaT1p0 = (Replica)EasyMock.createNiceMock(Replica.class);
        Replica futureReplicaT1p1 = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partitionT1p0 = (Partition)EasyMock.createMock(Partition.class);
        Partition partitionT1p1 = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        Capture responseCallback = EasyMock.newCapture();
        int leaderEpoch = 2;
        int futureReplicaLEO = 191;
        int replicaT1p0LEO = 190;
        int replicaT1p1LEO = 192;
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).andStubReturn((Object)partitionT1p0);
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p1(), false)).andStubReturn((Object)partitionT1p1);
        EasyMock.expect((Object)replicaManager.futureLocalReplicaOrException(this.t1p0())).andStubReturn((Object)futureReplicaT1p0);
        EasyMock.expect((Object)replicaManager.futureLocalReplicaOrException(this.t1p1())).andStubReturn((Object)futureReplicaT1p1);
        partitionT1p0.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateCaptureT1p0)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        partitionT1p1.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateCaptureT1p1)), EasyMock.anyBoolean());
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)futureReplicaT1p0.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)futureReplicaLEO)).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)futureReplicaT1p1.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)futureReplicaLEO)).anyTimes();
        EasyMock.expect((Object)futureReplicaT1p0.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)futureReplicaT1p0.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)partitionT1p0.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch, false)).andReturn((Object)new EpochEndOffset(leaderEpoch, (long)replicaT1p0LEO)).anyTimes();
        EasyMock.expect((Object)futureReplicaT1p1.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).anyTimes();
        EasyMock.expect((Object)futureReplicaT1p1.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, leaderEpoch))).anyTimes();
        EasyMock.expect((Object)partitionT1p1.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch, false)).andReturn((Object)new EpochEndOffset(leaderEpoch, (long)replicaT1p1LEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        this.stubWithFetchMessages(replicaT1p0, replicaT1p1, futureReplicaT1p0, partitionT1p0, replicaManager, (Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>>)responseCallback);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quotaManager, replicaT1p0, replicaT1p1, futureReplicaT1p0, partitionT1p0, partitionT1p1});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, null);
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertEquals((long)replicaT1p0LEO, (long)BoxesRunTime.unboxToLong((Object)truncateCaptureT1p0.getValue()));
        Assert.assertEquals((long)futureReplicaLEO, (long)BoxesRunTime.unboxToLong((Object)truncateCaptureT1p1.getValue()));
    }

    @Test
    public void shouldTruncateToEndOffsetOfLargestCommonEpoch() {
        Capture truncateToCapture = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Replica futureReplica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        Capture responseCallback = EasyMock.newCapture();
        int leaderEpoch = 5;
        int futureReplicaLEO = 195;
        int replicaLEO = 200;
        int replicaEpochEndOffset = 190;
        int futureReplicaEpochEndOffset = 191;
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).andStubReturn((Object)partition);
        EasyMock.expect((Object)replicaManager.futureLocalReplicaOrException(this.t1p0())).andStubReturn((Object)futureReplica);
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncateToCapture)), EasyMock.eq((boolean)true));
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)futureReplica.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)futureReplicaLEO)).anyTimes();
        EasyMock.expect((Object)futureReplica.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).once();
        EasyMock.expect((Object)futureReplica.latestEpoch()).andReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)(leaderEpoch - 2)))).once();
        EasyMock.expect((Object)partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch, false)).andReturn((Object)new EpochEndOffset(leaderEpoch - 1, (long)replicaLEO)).anyTimes();
        EasyMock.expect((Object)futureReplica.endOffsetForEpoch(leaderEpoch - 1)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, leaderEpoch - 2))).anyTimes();
        EasyMock.expect((Object)partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch - 2, false)).andReturn((Object)new EpochEndOffset(leaderEpoch - 2, (long)replicaEpochEndOffset)).anyTimes();
        EasyMock.expect((Object)futureReplica.endOffsetForEpoch(leaderEpoch - 2)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaEpochEndOffset, leaderEpoch - 2))).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        this.stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, (Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>>)responseCallback);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quotaManager, replica, futureReplica, partition});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, null);
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        thread.doWork();
        Assert.assertTrue((String)new StringBuilder().append((Object)"Expected offset ").append((Object)BoxesRunTime.boxToInteger((int)replicaEpochEndOffset)).append((Object)" in captured truncation offsets ").append((Object)truncateToCapture.getValues()).toString(), (boolean)((SeqLike)JavaConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)replicaEpochEndOffset)));
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfReplicaReturnsUndefinedOffset() {
        Capture truncated = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Replica futureReplica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        Capture responseCallback = EasyMock.newCapture();
        int initialFetchOffset = 100;
        int futureReplicaLEO = 111;
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).andStubReturn((Object)partition);
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncated)), EasyMock.eq((boolean)true));
        EasyMock.expect((Object)BoxedUnit.UNIT).anyTimes();
        EasyMock.expect((Object)replicaManager.futureLocalReplicaOrException(this.t1p0())).andStubReturn((Object)futureReplica);
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)futureReplica.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)futureReplicaLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        EasyMock.expect((Object)futureReplica.latestEpoch()).andReturn((Object)None$.MODULE$).anyTimes();
        this.stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, (Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>>)responseCallback);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quotaManager, replica, futureReplica, partition});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, null);
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(initialFetchOffset, this.offsetAndEpoch$default$2()))})));
        thread.doWork();
        Assert.assertEquals((String)"Expected future replica to truncate to initial fetch offset if replica returns UNDEFINED_EPOCH_OFFSET", (long)initialFetchOffset, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldPollIndefinitelyIfReplicaNotAvailable() {
        Capture truncated = EasyMock.newCapture((CaptureType)CaptureType.ALL);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Replica futureReplica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        Capture responseCallback = EasyMock.newCapture();
        int futureReplicaLeaderEpoch = 1;
        int futureReplicaLEO = 290;
        int replicaLEO = 300;
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).andStubReturn((Object)partition);
        partition.truncateTo(BoxesRunTime.unboxToLong((Object)EasyMock.capture((Capture)truncated)), EasyMock.eq((boolean)true));
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.expect((Object)replicaManager.futureLocalReplicaOrException(this.t1p0())).andStubReturn((Object)futureReplica);
        EasyMock.expect((Object)futureReplica.latestEpoch()).andStubReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)futureReplicaLeaderEpoch)));
        EasyMock.expect((Object)futureReplica.endOffsetForEpoch(futureReplicaLeaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, futureReplicaLeaderEpoch)));
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)futureReplica.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)futureReplicaLEO)).anyTimes();
        EasyMock.expect((Object)replicaManager.localReplica(this.t1p0())).andReturn((Object)new Some((Object)replica)).anyTimes();
        EasyMock.expect((Object)replicaManager.futureLocalReplica(this.t1p0())).andReturn((Object)new Some((Object)futureReplica)).anyTimes();
        EasyMock.expect((Object)replicaManager.futureLocalReplicaOrException(this.t1p0())).andReturn((Object)futureReplica).anyTimes();
        EasyMock.expect((Object)partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), futureReplicaLeaderEpoch, false)).andReturn((Object)new EpochEndOffset(Errors.REPLICA_NOT_AVAILABLE, -1, -1L)).times(3).andReturn((Object)new EpochEndOffset(futureReplicaLeaderEpoch, (long)replicaLEO));
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        replicaManager.fetchMessages(EasyMock.anyLong(), EasyMock.anyInt(), EasyMock.anyInt(), EasyMock.anyInt(), BoxesRunTime.unboxToBoolean((Object)EasyMock.anyObject()), (Seq)EasyMock.anyObject(), (ReplicaQuota)EasyMock.anyObject(), (Function1)EasyMock.capture((Capture)responseCallback), (IsolationLevel)EasyMock.anyObject());
        EasyMock.expect((Object)BoxedUnit.UNIT).andAnswer((IAnswer)new IAnswer<BoxedUnit>(this, responseCallback){
            private final Capture responseCallback$2;

            public void answer() {
                ((Function1)this.responseCallback$2.getValue()).apply((Object)Seq$.MODULE$.empty());
            }
            {
                this.responseCallback$2 = responseCallback$2;
            }
        }).anyTimes();
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quotaManager, replica, futureReplica, partition});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, null);
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp((Function1)new Serializable(this, thread){
            public static final long serialVersionUID = 0L;
            private final ReplicaAlterLogDirsThread thread$1;

            public final void apply(int x$1) {
                this.apply$mcVI$sp(x$1);
            }

            public void apply$mcVI$sp(int x$1) {
                this.thread$1.doWork();
            }
            {
                this.thread$1 = thread$1;
            }
        });
        Assert.assertEquals((long)0L, (long)truncated.getValues().size());
        thread.doWork();
        Assert.assertEquals((long)futureReplicaLEO, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnly() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Replica futureReplica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        Capture responseCallback = EasyMock.newCapture();
        int leaderEpoch = 5;
        int futureReplicaLEO = 190;
        int replicaLEO = 213;
        EasyMock.expect((Object)replicaManager.getPartitionOrException(this.t1p0(), false)).andStubReturn((Object)partition);
        EasyMock.expect((Object)partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch, false)).andReturn((Object)new EpochEndOffset(leaderEpoch, (long)replicaLEO));
        partition.truncateTo((long)futureReplicaLEO, true);
        EasyMock.expect((Object)BoxedUnit.UNIT).once();
        EasyMock.expect((Object)replicaManager.futureLocalReplicaOrException(this.t1p0())).andStubReturn((Object)futureReplica);
        EasyMock.expect((Object)futureReplica.latestEpoch()).andStubReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch)));
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)futureReplica.logEndOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)futureReplicaLEO)).anyTimes();
        EasyMock.expect((Object)futureReplica.endOffsetForEpoch(leaderEpoch)).andReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, leaderEpoch)));
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        this.stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, (Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>>)responseCallback);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quotaManager, replica, futureReplica, partition});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, null);
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, this.offsetAndEpoch$default$2()))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)new Serializable(this, thread){
            public static final long serialVersionUID = 0L;
            private final ReplicaAlterLogDirsThread thread$2;

            public final void apply(int x$2) {
                this.apply$mcVI$sp(x$2);
            }

            public void apply$mcVI$sp(int x$2) {
                this.thread$2.doWork();
            }
            {
                this.thread$2 = thread$2;
            }
        });
        EasyMock.verify((Object[])new Object[]{partition});
    }

    @Test
    public void shouldFetchOneReplicaAtATime() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Replica futureReplica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)futureReplica.logStartOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)123L)).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        this.stub(replica, replica, futureReplica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quotaManager, replica, futureReplica, partition});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        int leaderEpoch = 1;
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, null);
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, leaderEpoch)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, leaderEpoch))})));
        AbstractFetcherThread.ResultWithPartitions resultWithPartitions = thread.buildFetch((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)PartitionFetchState$.MODULE$.apply(150L, leaderEpoch, (ReplicaState)Fetching$.MODULE$)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)PartitionFetchState$.MODULE$.apply(160L, leaderEpoch, (ReplicaState)Fetching$.MODULE$))})));
        if (resultWithPartitions != null) {
            Tuple2 tuple2;
            Option fetchRequestOpt = (Option)resultWithPartitions.result();
            Set partitionsWithError = resultWithPartitions.partitionsWithError();
            Tuple2 tuple22 = tuple2 = new Tuple2((Object)fetchRequestOpt, (Object)partitionsWithError);
            Option fetchRequestOpt2 = (Option)tuple22._1();
            Set partitionsWithError2 = (Set)tuple22._2();
            Assert.assertTrue((boolean)fetchRequestOpt2.isDefined());
            FetchRequest.Builder fetchRequest = (FetchRequest.Builder)fetchRequestOpt2.get();
            Assert.assertFalse((boolean)fetchRequest.fetchData().isEmpty());
            Assert.assertFalse((boolean)partitionsWithError2.nonEmpty());
            FetchRequest request = (FetchRequest)fetchRequest.build();
            Assert.assertEquals((long)0L, (long)request.minBytes());
            Seq fetchInfos = ((MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(request.fetchData()).asScala()).toSeq();
            Assert.assertEquals((long)1L, (long)fetchInfos.length());
            Assert.assertEquals((String)"Expected fetch request for first partition", (Object)this.t1p0(), (Object)((Tuple2)fetchInfos.head())._1());
            Assert.assertEquals((long)150L, (long)((FetchRequest.PartitionData)((Tuple2)fetchInfos.head())._2()).fetchOffset);
            return;
        }
        throw new MatchError((Object)resultWithPartitions);
    }

    @Test
    public void shouldFetchNonDelayedAndNonTruncatingReplicas() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)EasyMock.createMock(LogManager.class);
        Replica replica = (Replica)EasyMock.createNiceMock(Replica.class);
        Replica futureReplica = (Replica)EasyMock.createNiceMock(Replica.class);
        Partition partition = (Partition)EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect((Object)BoxesRunTime.boxToLong((long)futureReplica.logStartOffset())).andReturn((Object)BoxesRunTime.boxToLong((long)123L)).anyTimes();
        EasyMock.expect((Object)replicaManager.logManager()).andReturn((Object)logManager).anyTimes();
        this.stub(replica, replica, futureReplica, partition, replicaManager);
        EasyMock.replay((Object[])new Object[]{replicaManager, logManager, quotaManager, replica, futureReplica, partition});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        int leaderEpoch = 1;
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", endPoint, config, this.failedPartitions(), replicaManager, quotaManager, null);
        thread.addPartitions((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.offsetAndEpoch(0L, leaderEpoch)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.offsetAndEpoch(0L, leaderEpoch))})));
        AbstractFetcherThread.ResultWithPartitions resultWithPartitions = thread.buildFetch((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)PartitionFetchState$.MODULE$.apply(150L, leaderEpoch, (ReplicaState)Fetching$.MODULE$)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)PartitionFetchState$.MODULE$.apply(160L, leaderEpoch, (ReplicaState)Truncating$.MODULE$))})));
        if (resultWithPartitions != null) {
            Tuple2 tuple2;
            Option fetchRequestOpt = (Option)resultWithPartitions.result();
            Set partitionsWithError = resultWithPartitions.partitionsWithError();
            Tuple2 tuple22 = tuple2 = new Tuple2((Object)fetchRequestOpt, (Object)partitionsWithError);
            Option fetchRequestOpt2 = (Option)tuple22._1();
            Set partitionsWithError2 = (Set)tuple22._2();
            Assert.assertTrue((boolean)fetchRequestOpt2.isDefined());
            FetchRequest.Builder fetchRequest = (FetchRequest.Builder)fetchRequestOpt2.get();
            Assert.assertFalse((boolean)fetchRequest.fetchData().isEmpty());
            Assert.assertFalse((boolean)partitionsWithError2.nonEmpty());
            Seq fetchInfos = ((MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(((FetchRequest)fetchRequest.build()).fetchData()).asScala()).toSeq();
            Assert.assertEquals((long)1L, (long)fetchInfos.length());
            Assert.assertEquals((String)"Expected fetch request for non-truncating partition", (Object)this.t1p0(), (Object)((Tuple2)fetchInfos.head())._1());
            Assert.assertEquals((long)150L, (long)((FetchRequest.PartitionData)((Tuple2)fetchInfos.head())._2()).fetchOffset);
            AbstractFetcherThread.ResultWithPartitions resultWithPartitions2 = thread.buildFetch((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)PartitionFetchState$.MODULE$.apply(140L, leaderEpoch, (ReplicaState)Fetching$.MODULE$)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new PartitionFetchState(160L, leaderEpoch, new DelayedItem(5000L), (ReplicaState)Fetching$.MODULE$))})));
            if (resultWithPartitions2 != null) {
                Tuple2 tuple23;
                Option fetchRequest2Opt = (Option)resultWithPartitions2.result();
                Set partitionsWithError22 = resultWithPartitions2.partitionsWithError();
                Tuple2 tuple24 = tuple23 = new Tuple2((Object)fetchRequest2Opt, (Object)partitionsWithError22);
                Option fetchRequest2Opt2 = (Option)tuple24._1();
                Set partitionsWithError23 = (Set)tuple24._2();
                Assert.assertTrue((boolean)fetchRequest2Opt2.isDefined());
                FetchRequest.Builder fetchRequest2 = (FetchRequest.Builder)fetchRequest2Opt2.get();
                Assert.assertFalse((boolean)fetchRequest2.fetchData().isEmpty());
                Assert.assertFalse((boolean)partitionsWithError23.nonEmpty());
                Seq fetchInfos2 = ((MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(((FetchRequest)fetchRequest2.build()).fetchData()).asScala()).toSeq();
                Assert.assertEquals((long)1L, (long)fetchInfos2.length());
                Assert.assertEquals((String)"Expected fetch request for non-delayed partition", (Object)this.t1p0(), (Object)((Tuple2)fetchInfos2.head())._1());
                Assert.assertEquals((long)140L, (long)((FetchRequest.PartitionData)((Tuple2)fetchInfos2.head())._2()).fetchOffset);
                AbstractFetcherThread.ResultWithPartitions resultWithPartitions3 = thread.buildFetch((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new PartitionFetchState(140L, leaderEpoch, new DelayedItem(5000L), (ReplicaState)Fetching$.MODULE$)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new PartitionFetchState(160L, leaderEpoch, new DelayedItem(5000L), (ReplicaState)Fetching$.MODULE$))})));
                if (resultWithPartitions3 != null) {
                    Tuple2 tuple25;
                    Option fetchRequest3Opt = (Option)resultWithPartitions3.result();
                    Set partitionsWithError3 = resultWithPartitions3.partitionsWithError();
                    Tuple2 tuple26 = tuple25 = new Tuple2((Object)fetchRequest3Opt, (Object)partitionsWithError3);
                    Option fetchRequest3Opt2 = (Option)tuple26._1();
                    Set partitionsWithError32 = (Set)tuple26._2();
                    Assert.assertTrue((String)"Expected no fetch requests since all partitions are delayed", (boolean)fetchRequest3Opt2.isEmpty());
                    Assert.assertFalse((boolean)partitionsWithError32.nonEmpty());
                    return;
                }
                throw new MatchError((Object)resultWithPartitions3);
            }
            throw new MatchError((Object)resultWithPartitions2);
        }
        throw new MatchError((Object)resultWithPartitions);
    }

    public IExpectationSetters<Option<Partition>> stub(Replica replicaT1p0, Replica replicaT1p1, Replica futureReplica, Partition partition, ReplicaManager replicaManager) {
        EasyMock.expect((Object)replicaManager.localReplica(this.t1p0())).andReturn((Object)new Some((Object)replicaT1p0)).anyTimes();
        EasyMock.expect((Object)replicaManager.futureLocalReplica(this.t1p0())).andReturn((Object)new Some((Object)futureReplica)).anyTimes();
        EasyMock.expect((Object)replicaManager.localReplicaOrException(this.t1p0())).andReturn((Object)replicaT1p0).anyTimes();
        EasyMock.expect((Object)replicaManager.futureLocalReplicaOrException(this.t1p0())).andReturn((Object)futureReplica).anyTimes();
        EasyMock.expect((Object)replicaManager.getPartition(this.t1p0())).andReturn((Object)new Some((Object)partition)).anyTimes();
        EasyMock.expect((Object)replicaManager.localReplica(this.t1p1())).andReturn((Object)new Some((Object)replicaT1p1)).anyTimes();
        EasyMock.expect((Object)replicaManager.futureLocalReplica(this.t1p1())).andReturn((Object)new Some((Object)futureReplica)).anyTimes();
        EasyMock.expect((Object)replicaManager.localReplicaOrException(this.t1p1())).andReturn((Object)replicaT1p1).anyTimes();
        EasyMock.expect((Object)replicaManager.futureLocalReplicaOrException(this.t1p1())).andReturn((Object)futureReplica).anyTimes();
        return EasyMock.expect((Object)replicaManager.getPartition(this.t1p1())).andReturn((Object)new Some((Object)partition)).anyTimes();
    }

    public IExpectationSetters<BoxedUnit> stubWithFetchMessages(Replica replicaT1p0, Replica replicaT1p1, Replica futureReplica, Partition partition, ReplicaManager replicaManager, Capture<Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit>> responseCallback) {
        this.stub(replicaT1p0, replicaT1p1, futureReplica, partition, replicaManager);
        replicaManager.fetchMessages(EasyMock.anyLong(), EasyMock.anyInt(), EasyMock.anyInt(), EasyMock.anyInt(), BoxesRunTime.unboxToBoolean((Object)EasyMock.anyObject()), (Seq)EasyMock.anyObject(), (ReplicaQuota)EasyMock.anyObject(), (Function1)EasyMock.capture(responseCallback), (IsolationLevel)EasyMock.anyObject());
        return EasyMock.expect((Object)BoxedUnit.UNIT).andAnswer((IAnswer)new IAnswer<BoxedUnit>(this, responseCallback){
            private final Capture responseCallback$1;

            public void answer() {
                ((Function1)this.responseCallback$1.getValue()).apply((Object)Seq$.MODULE$.empty());
            }
            {
                this.responseCallback$1 = responseCallback$1;
            }
        }).anyTimes();
    }
}

