/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.util.Optional;
import kafka.cluster.Partition;
import kafka.server.DelayedFetch;
import kafka.server.FetchPartitionStatus;
import kafka.server.LogReadResult;
import kafka.server.LogReadResult$;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.FetchParams;
import org.apache.kafka.storage.internals.log.FetchPartitionData;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0005a\u0001\u0002\b\u0010\u0001QAQa\u0007\u0001\u0005\u0002qAqa\b\u0001C\u0002\u0013%\u0001\u0005\u0003\u0004%\u0001\u0001\u0006I!\t\u0005\bK\u0001\u0011\r\u0011\"\u0003'\u0011\u0019Q\u0003\u0001)A\u0005O!91\u0006\u0001b\u0001\n\u0013a\u0003B\u0002\u0019\u0001A\u0003%Q\u0006C\u00032\u0001\u0011\u0005!\u0007C\u0003D\u0001\u0011\u0005!\u0007C\u0003F\u0001\u0011\u0005!\u0007C\u0003H\u0001\u0011%\u0001\nC\u0003[\u0001\u0011%1\fC\u0003{\u0001\u0011%1P\u0001\tEK2\f\u00170\u001a3GKR\u001c\u0007\u000eV3ti*\u0011\u0001#E\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003I\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001+A\u0011a#G\u0007\u0002/)\t\u0001$A\u0003tG\u0006d\u0017-\u0003\u0002\u001b/\t1\u0011I\\=SK\u001a\fa\u0001P5oSRtD#A\u000f\u0011\u0005y\u0001Q\"A\b\u0002\u00115\f\u0007PQ=uKN,\u0012!\t\t\u0003-\tJ!aI\f\u0003\u0007%sG/A\u0005nCb\u0014\u0015\u0010^3tA\u0005q!/\u001a9mS\u000e\fW*\u00198bO\u0016\u0014X#A\u0014\u0011\u0005yA\u0013BA\u0015\u0010\u00059\u0011V\r\u001d7jG\u0006l\u0015M\\1hKJ\fqB]3qY&\u001c\u0017-T1oC\u001e,'\u000fI\u0001\re\u0016\u0004H.[2b#V|G/Y\u000b\u0002[A\u0011aDL\u0005\u0003_=\u0011ABU3qY&\u001c\u0017-U;pi\u0006\fQB]3qY&\u001c\u0017-U;pi\u0006\u0004\u0013\u0001\u0007;fgR4U\r^2i/&$\bNR3oG\u0016$W\t]8dQR\t1\u0007\u0005\u0002\u0017i%\u0011Qg\u0006\u0002\u0005+:LG\u000f\u000b\u0002\toA\u0011\u0001(Q\u0007\u0002s)\u0011!hO\u0001\u0004CBL'B\u0001\u001f>\u0003\u001dQW\u000f]5uKJT!AP \u0002\u000b),h.\u001b;\u000b\u0003\u0001\u000b1a\u001c:h\u0013\t\u0011\u0015H\u0001\u0003UKN$\u0018a\u0006;fgRtu\u000e\u001e'fC\u0012,'o\u0014:G_2dwn^3sQ\tIq'\u0001\nuKN$H)\u001b<fe\u001eLgnZ#q_\u000eD\u0007F\u0001\u00068\u0003a\u0011W/\u001b7e\r>dGn\\<fe\u001a+Go\u00195QCJ\fWn\u001d\u000b\u0004\u0013ZC\u0006C\u0001&U\u001b\u0005Y%B\u0001'N\u0003\rawn\u001a\u0006\u0003\u001d>\u000b\u0011\"\u001b8uKJt\u0017\r\\:\u000b\u0005A\u000b\u0016aB:u_J\fw-\u001a\u0006\u0003%IS!aU \u0002\r\u0005\u0004\u0018m\u00195f\u0013\t)6JA\u0006GKR\u001c\u0007\u000eU1sC6\u001c\b\"B,\f\u0001\u0004\t\u0013!\u0003:fa2L7-Y%e\u0011\u0015I6\u00021\u0001\"\u0003%i\u0017\r_,bSRl5/A\u000bfqB,7\r\u001e*fC\u00124%o\\7SKBd\u0017nY1\u0015\u000bMbfL\u001a:\t\u000buc\u0001\u0019A%\u0002\u0017\u0019,Go\u00195QCJ\fWn\u001d\u0005\u0006?2\u0001\r\u0001Y\u0001\u0011i>\u0004\u0018nY%e!\u0006\u0014H/\u001b;j_:\u0004\"!\u00193\u000e\u0003\tT!aY)\u0002\r\r|W.\\8o\u0013\t)'M\u0001\tU_BL7-\u00133QCJ$\u0018\u000e^5p]\")q\r\u0004a\u0001Q\u0006\u0011b-\u001a;dQB\u000b'\u000f^5uS>tG)\u0019;b!\tIwN\u0004\u0002k[6\t1N\u0003\u0002mE\u0006A!/Z9vKN$8/\u0003\u0002oW\u0006aa)\u001a;dQJ+\u0017/^3ti&\u0011\u0001/\u001d\u0002\u000e!\u0006\u0014H/\u001b;j_:$\u0015\r^1\u000b\u00059\\\u0007\"B:\r\u0001\u0004!\u0018!B3se>\u0014\bCA;y\u001b\u00051(BA<c\u0003!\u0001(o\u001c;pG>d\u0017BA=w\u0005\u0019)%O]8sg\u0006y!-^5mIJ+\u0017\r\u001a*fgVdG\u000f\u0006\u0002}\u007fB\u0011a$`\u0005\u0003}>\u0011Q\u0002T8h%\u0016\fGMU3tk2$\b\"B:\u000e\u0001\u0004!\b")
public class DelayedFetchTest {
    private final int maxBytes;
    private final ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
    private final ReplicaQuota replicaQuota = (ReplicaQuota)Mockito.mock(ReplicaQuota.class);

    private int maxBytes() {
        return this.maxBytes;
    }

    private ReplicaManager replicaManager() {
        return this.replicaManager;
    }

    private ReplicaQuota replicaQuota() {
        return this.replicaQuota;
    }

    @Test
    public void testFetchWithFencedEpoch() {
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic");
        long fetchOffset = 500L;
        long logStartOffset = 0L;
        Optional<Integer> currentLeaderEpoch = Optional.of(Predef$.MODULE$.int2Integer(10));
        int replicaId = 1;
        FetchPartitionStatus fetchStatus = new FetchPartitionStatus(new LogOffsetMetadata(fetchOffset), new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, this.maxBytes(), currentLeaderEpoch));
        FetchParams fetchParams = this.buildFollowerFetchParams(replicaId, 500);
        ObjectRef fetchResultOpt = ObjectRef.create((Object)None$.MODULE$);
        DelayedFetch delayedFetch = new DelayedFetch(fetchParams, (Seq)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicIdPartition), (Object)fetchStatus), (List)Nil$.MODULE$), this.replicaManager(), this.replicaQuota(), (Function1 & Serializable & scala.Serializable)responses -> {
            DelayedFetchTest.callback$1(responses, fetchResultOpt);
            return BoxedUnit.UNIT;
        });
        Partition partition = (Partition)Mockito.mock(Partition.class);
        Mockito.when((Object)this.replicaManager().getPartitionOrException(topicIdPartition.topicPartition())).thenReturn((Object)partition);
        Mockito.when((Object)partition.fetchOffsetSnapshot(currentLeaderEpoch, true)).thenThrow(new Throwable[]{new FencedLeaderEpochException("Requested epoch has been fenced")});
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.replicaManager().isAddingReplica((TopicPartition)ArgumentMatchers.any(), ArgumentMatchers.anyInt()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        this.expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo(), Errors.FENCED_LEADER_EPOCH);
        Assertions.assertTrue((boolean)delayedFetch.tryComplete());
        Assertions.assertTrue((boolean)delayedFetch.isCompleted());
        Assertions.assertTrue((boolean)((Option)fetchResultOpt.elem).isDefined());
        FetchPartitionData fetchResult = (FetchPartitionData)((Option)fetchResultOpt.elem).get();
        Assertions.assertEquals((Object)Errors.FENCED_LEADER_EPOCH, (Object)fetchResult.error);
    }

    @Test
    public void testNotLeaderOrFollower() {
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic");
        long fetchOffset = 500L;
        long logStartOffset = 0L;
        Optional<Integer> currentLeaderEpoch = Optional.of(Predef$.MODULE$.int2Integer(10));
        int replicaId = 1;
        FetchPartitionStatus fetchStatus = new FetchPartitionStatus(new LogOffsetMetadata(fetchOffset), new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, this.maxBytes(), currentLeaderEpoch));
        FetchParams fetchParams = this.buildFollowerFetchParams(replicaId, 500);
        ObjectRef fetchResultOpt = ObjectRef.create((Object)None$.MODULE$);
        DelayedFetch delayedFetch = new DelayedFetch(fetchParams, (Seq)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicIdPartition), (Object)fetchStatus), (List)Nil$.MODULE$), this.replicaManager(), this.replicaQuota(), (Function1 & Serializable & scala.Serializable)responses -> {
            DelayedFetchTest.callback$2(responses, fetchResultOpt);
            return BoxedUnit.UNIT;
        });
        Mockito.when((Object)this.replicaManager().getPartitionOrException(topicIdPartition.topicPartition())).thenThrow(new Throwable[]{new NotLeaderOrFollowerException(new StringBuilder(26).append("Replica for ").append(topicIdPartition).append(" not available").toString())});
        this.expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo(), Errors.NOT_LEADER_OR_FOLLOWER);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.replicaManager().isAddingReplica((TopicPartition)ArgumentMatchers.any(), ArgumentMatchers.anyInt()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        Assertions.assertTrue((boolean)delayedFetch.tryComplete());
        Assertions.assertTrue((boolean)delayedFetch.isCompleted());
        Assertions.assertTrue((boolean)((Option)fetchResultOpt.elem).isDefined());
    }

    @Test
    public void testDivergingEpoch() {
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic");
        long fetchOffset = 500L;
        long logStartOffset = 0L;
        Optional<Integer> currentLeaderEpoch = Optional.of(Predef$.MODULE$.int2Integer(10));
        Optional<Integer> lastFetchedEpoch = Optional.of(Predef$.MODULE$.int2Integer(9));
        int replicaId = 1;
        FetchPartitionStatus fetchStatus = new FetchPartitionStatus(new LogOffsetMetadata(fetchOffset), new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset, logStartOffset, this.maxBytes(), currentLeaderEpoch, lastFetchedEpoch));
        FetchParams fetchParams = this.buildFollowerFetchParams(replicaId, 500);
        ObjectRef fetchResultOpt = ObjectRef.create((Object)None$.MODULE$);
        DelayedFetch delayedFetch = new DelayedFetch(fetchParams, (Seq)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicIdPartition), (Object)fetchStatus), (List)Nil$.MODULE$), this.replicaManager(), this.replicaQuota(), (Function1 & Serializable & scala.Serializable)responses -> {
            DelayedFetchTest.callback$3(responses, fetchResultOpt);
            return BoxedUnit.UNIT;
        });
        Partition partition = (Partition)Mockito.mock(Partition.class);
        Mockito.when((Object)this.replicaManager().getPartitionOrException(topicIdPartition.topicPartition())).thenReturn((Object)partition);
        LogOffsetMetadata endOffsetMetadata = new LogOffsetMetadata(500L, 0L, 500);
        Mockito.when((Object)partition.fetchOffsetSnapshot(currentLeaderEpoch, true)).thenReturn((Object)new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata));
        Mockito.when((Object)partition.lastOffsetForLeaderEpoch(currentLeaderEpoch, Predef$.MODULE$.Integer2int(lastFetchedEpoch.get()), false)).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(topicIdPartition.partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(Predef$.MODULE$.Integer2int(lastFetchedEpoch.get())).setEndOffset(fetchOffset - 1L));
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.replicaManager().isAddingReplica((TopicPartition)ArgumentMatchers.any(), ArgumentMatchers.anyInt()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        this.expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo(), Errors.NONE);
        Assertions.assertTrue((boolean)delayedFetch.tryComplete());
        Assertions.assertTrue((boolean)delayedFetch.isCompleted());
        Assertions.assertTrue((boolean)((Option)fetchResultOpt.elem).isDefined());
    }

    private FetchParams buildFollowerFetchParams(int replicaId, int maxWaitMs) {
        return new FetchParams(ApiKeys.FETCH.latestVersion(), replicaId, 1L, (long)maxWaitMs, 1, this.maxBytes(), FetchIsolation.LOG_END, Optional.empty());
    }

    private void expectReadFromReplica(FetchParams fetchParams, TopicIdPartition topicIdPartition, FetchRequest.PartitionData fetchPartitionData, Errors error) {
        Mockito.when((Object)this.replicaManager().readFromLocalLog(fetchParams, (Seq)new .colon.colon((Object)new Tuple2((Object)topicIdPartition, (Object)fetchPartitionData), (List)Nil$.MODULE$), this.replicaQuota(), true)).thenReturn((Object)new .colon.colon((Object)new Tuple2((Object)topicIdPartition, (Object)this.buildReadResult(error)), (List)Nil$.MODULE$));
    }

    private LogReadResult buildReadResult(Errors error) {
        Errors errors = error;
        Errors errors2 = Errors.NONE;
        None$ x$1 = (errors == null ? errors2 != null : !errors.equals(errors2)) ? new Some((Object)error.exception()) : None$.MODULE$;
        FetchDataInfo x$2 = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, (Records)MemoryRecords.EMPTY);
        None$ x$3 = None$.MODULE$;
        long x$4 = -1L;
        long x$5 = -1L;
        long x$6 = -1L;
        long x$7 = -1L;
        long x$8 = -1L;
        None$ x$9 = None$.MODULE$;
        Option x$10 = LogReadResult$.MODULE$.apply$default$9();
        return new LogReadResult(x$2, (Option)x$3, x$4, x$5, x$6, x$7, x$8, (Option)x$9, x$10, (Option)x$1);
    }

    private static final void callback$1(Seq responses, ObjectRef fetchResultOpt$1) {
        fetchResultOpt$1.elem = new Some(((Tuple2)responses.head())._2());
    }

    private static final void callback$2(Seq responses, ObjectRef fetchResultOpt$2) {
        fetchResultOpt$2.elem = new Some(((Tuple2)responses.head())._2());
    }

    private static final void callback$3(Seq responses, ObjectRef fetchResultOpt$3) {
        fetchResultOpt$3.elem = new Some(((Tuple2)responses.head())._2());
    }

    public DelayedFetchTest() {
        this.maxBytes = 1024;
    }
}

