/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import kafka.cluster.Replica;
import kafka.server.DelayedFetchMetrics$;
import kafka.server.DelayedOperation;
import kafka.server.FetchMetadata;
import kafka.server.FetchPartitionData;
import kafka.server.FetchPartitionStatus;
import kafka.server.LogOffsetMetadata;
import kafka.server.LogOffsetMetadata$;
import kafka.server.LogReadResult;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.requests.FetchRequest;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.StringOps;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.IntRef;
import scala.runtime.NonLocalReturnControl;

@ScalaSignature(bytes="\u0006\u0001]3A!\u0001\u0002\u0001\u000f\taA)\u001a7bs\u0016$g)\u001a;dQ*\u00111\u0001B\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003\u0015\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001\u0011A\u0011\u0011BC\u0007\u0002\u0005%\u00111B\u0001\u0002\u0011\t\u0016d\u0017-_3e\u001fB,'/\u0019;j_:D\u0011\"\u0004\u0001\u0003\u0002\u0003\u0006IA\u0004\u000b\u0002\u000f\u0011,G.Y=NgB\u0011qBE\u0007\u0002!)\t\u0011#A\u0003tG\u0006d\u0017-\u0003\u0002\u0014!\t!Aj\u001c8h\u0013\ti!\u0002\u0003\u0005\u0017\u0001\t\u0005\t\u0015!\u0003\u0018\u000351W\r^2i\u001b\u0016$\u0018\rZ1uCB\u0011\u0011\u0002G\u0005\u00033\t\u0011QBR3uG\"lU\r^1eCR\f\u0007\u0002C\u000e\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u000f\u0002\u001dI,\u0007\u000f\\5dC6\u000bg.Y4feB\u0011\u0011\"H\u0005\u0003=\t\u0011aBU3qY&\u001c\u0017-T1oC\u001e,'\u000f\u0003\u0005!\u0001\t\u0005\t\u0015!\u0003\"\u0003\u0015\tXo\u001c;b!\tI!%\u0003\u0002$\u0005\ta!+\u001a9mS\u000e\f\u0017+^8uC\"AQ\u0005\u0001B\u0001B\u0003%a%\u0001\tsKN\u0004xN\\:f\u0007\u0006dGNY1dWB!qbJ\u0015A\u0013\tA\u0003CA\u0005Gk:\u001cG/[8ocA\u0019!&L\u0018\u000e\u0003-R!\u0001\f\t\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002/W\t\u00191+Z9\u0011\t=\u0001$'P\u0005\u0003cA\u0011a\u0001V;qY\u0016\u0014\u0004CA\u001a<\u001b\u0005!$BA\u001b7\u0003\u0019\u0019w.\\7p]*\u0011Qa\u000e\u0006\u0003qe\na!\u00199bG\",'\"\u0001\u001e\u0002\u0007=\u0014x-\u0003\u0002=i\tqAk\u001c9jGB\u000b'\u000f^5uS>t\u0007CA\u0005?\u0013\ty$A\u0001\nGKR\u001c\u0007\u000eU1si&$\u0018n\u001c8ECR\f\u0007CA\bB\u0013\t\u0011\u0005C\u0001\u0003V]&$\b\"\u0002#\u0001\t\u0003)\u0015A\u0002\u001fj]&$h\b\u0006\u0004G\u000f\"K%j\u0013\t\u0003\u0013\u0001AQ!D\"A\u00029AQAF\"A\u0002]AQaG\"A\u0002qAQ\u0001I\"A\u0002\u0005BQ!J\"A\u0002\u0019BQ!\u0014\u0001\u0005B9\u000b1\u0002\u001e:z\u0007>l\u0007\u000f\\3uKR\tq\n\u0005\u0002\u0010!&\u0011\u0011\u000b\u0005\u0002\b\u0005>|G.Z1o\u0011\u0015\u0019\u0006\u0001\"\u0011U\u00031yg.\u0012=qSJ\fG/[8o)\u0005\u0001\u0005\"\u0002,\u0001\t\u0003\"\u0016AC8o\u0007>l\u0007\u000f\\3uK\u0002")
public class DelayedFetch
extends DelayedOperation {
    private final FetchMetadata fetchMetadata;
    private final ReplicaManager replicaManager;
    private final ReplicaQuota quota;
    private final Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit> responseCallback;

    @Override
    public boolean tryComplete() {
        boolean bl;
        Object object = new Object();
        try {
            IntRef accumulatedSize = IntRef.create((int)0);
            IntRef accumulatedThrottledSize = IntRef.create((int)0);
            this.fetchMetadata.fetchPartitionStatus().foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                DelayedFetch.$anonfun$tryComplete$1(this, accumulatedSize, accumulatedThrottledSize, object, x0$1);
                return BoxedUnit.UNIT;
            });
            bl = accumulatedSize.elem >= this.fetchMetadata.fetchMinBytes() || accumulatedSize.elem + accumulatedThrottledSize.elem >= this.fetchMetadata.fetchMinBytes() && !this.quota.isQuotaExceeded() ? this.forceComplete() : false;
        }
        catch (NonLocalReturnControl ex) {
            if (ex.key() == object) {
                bl = ex.value$mcZ$sp();
            }
            throw ex;
        }
        return bl;
    }

    @Override
    public void onExpiration() {
        if (this.fetchMetadata.isFromFollower()) {
            DelayedFetchMetrics$.MODULE$.followerExpiredRequestMeter().mark();
        } else {
            DelayedFetchMetrics$.MODULE$.consumerExpiredRequestMeter().mark();
        }
    }

    @Override
    public void onComplete() {
        Seq<Tuple2<TopicPartition, LogReadResult>> logReadResults = this.replicaManager.readFromLocalLog(this.fetchMetadata.replicaId(), this.fetchMetadata.fetchOnlyLeader(), this.fetchMetadata.fetchOnlyCommitted(), this.fetchMetadata.fetchMaxBytes(), this.fetchMetadata.hardMaxBytesLimit(), (Seq<Tuple2<TopicPartition, FetchRequest.PartitionData>>)((Seq)this.fetchMetadata.fetchPartitionStatus().map((Function1 & Serializable & scala.Serializable)x0$2 -> {
            Tuple2 tuple2 = x0$2;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            TopicPartition tp = (TopicPartition)tuple2._1();
            FetchPartitionStatus status = (FetchPartitionStatus)tuple2._2();
            Tuple2 tuple22 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), (Object)status.fetchInfo());
            return tuple22;
        }, Seq$.MODULE$.canBuildFrom())), this.quota);
        Seq fetchPartitionData = (Seq)logReadResults.map((Function1 & Serializable & scala.Serializable)x0$3 -> {
            Tuple2 tuple2 = x0$3;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            TopicPartition tp = (TopicPartition)tuple2._1();
            LogReadResult result = (LogReadResult)tuple2._2();
            Tuple2 tuple22 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), (Object)new FetchPartitionData(result.error(), result.hw(), result.info().records()));
            return tuple22;
        }, Seq$.MODULE$.canBuildFrom());
        this.responseCallback.apply((Object)fetchPartitionData);
    }

    public static final /* synthetic */ void $anonfun$tryComplete$1(DelayedFetch $this, IntRef accumulatedSize$1, IntRef accumulatedThrottledSize$1, Object nonLocalReturnKey1$1, Tuple2 x0$1) {
        BoxedUnit boxedUnit;
        Tuple2 tuple2 = x0$1;
        if (tuple2 != null) {
            TopicPartition topicPartition = (TopicPartition)tuple2._1();
            FetchPartitionStatus fetchStatus = (FetchPartitionStatus)tuple2._2();
            LogOffsetMetadata fetchOffset = fetchStatus.startOffsetMetadata();
            try {
                LogOffsetMetadata logOffsetMetadata = fetchOffset;
                LogOffsetMetadata logOffsetMetadata2 = LogOffsetMetadata$.MODULE$.UnknownOffsetMetadata();
                if (logOffsetMetadata == null ? logOffsetMetadata2 != null : !((Object)logOffsetMetadata).equals(logOffsetMetadata2)) {
                    LogOffsetMetadata endOffset;
                    Replica replica = $this.replicaManager.getLeaderReplicaIfLocal(topicPartition);
                    LogOffsetMetadata logOffsetMetadata3 = endOffset = $this.fetchMetadata.fetchOnlyCommitted() ? replica.highWatermark() : replica.logEndOffset();
                    if (endOffset.messageOffset() != fetchOffset.messageOffset()) {
                        if (endOffset.onOlderSegment(fetchOffset)) {
                            $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Satisfying fetch %s since it is fetching later segments of partition %s.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{$this.fetchMetadata, topicPartition})));
                            throw new NonLocalReturnControl.mcZ.sp(nonLocalReturnKey1$1, $this.forceComplete());
                        }
                        if (fetchOffset.onOlderSegment(endOffset)) {
                            $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Satisfying fetch %s immediately since it is fetching older segments.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{$this.fetchMetadata})));
                            if (!$this.replicaManager.shouldLeaderThrottle($this.quota, topicPartition, $this.fetchMetadata.replicaId())) {
                                throw new NonLocalReturnControl.mcZ.sp(nonLocalReturnKey1$1, $this.forceComplete());
                            }
                            boxedUnit = BoxedUnit.UNIT;
                        }
                        if (fetchOffset.messageOffset() < endOffset.messageOffset()) {
                            int bytesAvailable = package$.MODULE$.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo().maxBytes);
                            if ($this.quota.isThrottled(topicPartition)) {
                                accumulatedThrottledSize$1.elem += bytesAvailable;
                                boxedUnit = BoxedUnit.UNIT;
                            }
                            accumulatedSize$1.elem += bytesAvailable;
                            boxedUnit = BoxedUnit.UNIT;
                        }
                        boxedUnit = BoxedUnit.UNIT;
                    }
                    boxedUnit = BoxedUnit.UNIT;
                }
                boxedUnit = BoxedUnit.UNIT;
            }
            catch (UnknownTopicOrPartitionException unknownTopicOrPartitionException) {
                $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Broker no longer know of %s, satisfy %s immediately")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicPartition, $this.fetchMetadata})));
                throw new NonLocalReturnControl.mcZ.sp(nonLocalReturnKey1$1, $this.forceComplete());
            }
            catch (NotLeaderForPartitionException notLeaderForPartitionException) {
                $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Broker is no longer the leader of %s, satisfy %s immediately")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicPartition, $this.fetchMetadata})));
                throw new NonLocalReturnControl.mcZ.sp(nonLocalReturnKey1$1, $this.forceComplete());
            }
        } else {
            throw new MatchError((Object)tuple2);
        }
        BoxedUnit boxedUnit2 = boxedUnit;
    }

    public DelayedFetch(long delayMs, FetchMetadata fetchMetadata, ReplicaManager replicaManager, ReplicaQuota quota, Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit> responseCallback) {
        this.fetchMetadata = fetchMetadata;
        this.replicaManager = replicaManager;
        this.quota = quota;
        this.responseCallback = responseCallback;
        super(delayMs);
    }
}

