/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.net.SocketTimeoutException;
import java.util.LinkedHashMap;
import kafka.admin.AdminUtils$;
import kafka.api.KAFKA_0_10_0_IV0$;
import kafka.api.KAFKA_0_10_1_IV1$;
import kafka.api.KAFKA_0_10_1_IV2$;
import kafka.api.KAFKA_0_9_0$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Replica;
import kafka.common.KafkaStorageException;
import kafka.log.Log;
import kafka.log.LogConfig$;
import kafka.server.AbstractFetcherThread;
import kafka.server.ConfigType$;
import kafka.server.KafkaConfig;
import kafka.server.LogOffsetMetadata;
import kafka.server.LogOffsetMetadata$;
import kafka.server.PartitionFetchState;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.ReplicationQuotaManager;
import kafka.utils.NetworkClientBlockingOps$;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.LoginType;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;

@ScalaSignature(bytes="\u0006\u0001\teg\u0001B\u0001\u0003\u0001\u001d\u0011ACU3qY&\u001c\u0017MR3uG\",'\u000f\u00165sK\u0006$'BA\u0002\u0005\u0003\u0019\u0019XM\u001d<fe*\tQ!A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001A\u0001CA\u0005\u000b\u001b\u0005\u0011\u0011BA\u0006\u0003\u0005U\t%m\u001d;sC\u000e$h)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012D\u0011\"\u0004\u0001\u0003\u0002\u0003\u0006IAD\u000e\u0002\t9\fW.\u001a\t\u0003\u001faq!\u0001\u0005\f\u0011\u0005E!R\"\u0001\n\u000b\u0005M1\u0011A\u0002\u001fs_>$hHC\u0001\u0016\u0003\u0015\u00198-\u00197b\u0013\t9B#\u0001\u0004Qe\u0016$WMZ\u0005\u00033i\u0011aa\u0015;sS:<'BA\f\u0015\u0013\tiA$\u0003\u0002\u001e=\t\u00112\u000b[;uI><h.\u00192mKRC'/Z1e\u0015\tyB!A\u0003vi&d7\u000f\u0003\u0005\"\u0001\t\u0005\t\u0015!\u0003#\u0003%1W\r^2iKJLE\r\u0005\u0002$I5\tA#\u0003\u0002&)\t\u0019\u0011J\u001c;\t\u0011\u001d\u0002!\u0011!Q\u0001\n!\nAb]8ve\u000e,'I]8lKJ\u0004\"!\u000b\u0017\u000e\u0003)R!a\u000b\u0003\u0002\u000f\rdWo\u001d;fe&\u0011QF\u000b\u0002\u000f\u0005J|7.\u001a:F]\u0012\u0004v.\u001b8u\u0011!y\u0003A!A!\u0002\u0013\u0001\u0014\u0001\u00042s_.,'oQ8oM&<\u0007CA\u00052\u0013\t\u0011$AA\u0006LC\u001a\\\u0017mQ8oM&<\u0007\u0002\u0003\u001b\u0001\u0005\u0003\u0005\u000b\u0011B\u001b\u0002\u0015I,\u0007\u000f\\5dC6;'\u000f\u0005\u0002\nm%\u0011qG\u0001\u0002\u000f%\u0016\u0004H.[2b\u001b\u0006t\u0017mZ3s\u0011!I\u0004A!A!\u0002\u0013Q\u0014aB7fiJL7m\u001d\t\u0003w\u0011k\u0011\u0001\u0010\u0006\u0003suR!AP \u0002\r\r|W.\\8o\u0015\t)\u0001I\u0003\u0002B\u0005\u00061\u0011\r]1dQ\u0016T\u0011aQ\u0001\u0004_J<\u0017BA#=\u0005\u001diU\r\u001e:jGND\u0001b\u0012\u0001\u0003\u0002\u0003\u0006I\u0001S\u0001\u0005i&lW\r\u0005\u0002J\u00176\t!J\u0003\u0002 {%\u0011AJ\u0013\u0002\u0005)&lW\r\u0003\u0005O\u0001\t\u0005\t\u0015!\u0003P\u0003\u0015\tXo\u001c;b!\tI\u0001+\u0003\u0002R\u0005\t9\"+\u001a9mS\u000e\fG/[8o#V|G/Y'b]\u0006<WM\u001d\u0005\u0006'\u0002!\t\u0001V\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0013U3v\u000bW-[7rk\u0006CA\u0005\u0001\u0011\u0015i!\u000b1\u0001\u000f\u0011\u0015\t#\u000b1\u0001#\u0011\u00159#\u000b1\u0001)\u0011\u0015y#\u000b1\u00011\u0011\u0015!$\u000b1\u00016\u0011\u0015I$\u000b1\u0001;\u0011\u00159%\u000b1\u0001I\u0011\u0015q%\u000b1\u0001P\u000b\u0011y\u0006\u0001\u00011\u0003\u0007I+\u0015\u000b\u0005\u0002bY:\u0011\u0011BY\u0004\u0006G\nA\t\u0001Z\u0001\u0015%\u0016\u0004H.[2b\r\u0016$8\r[3s)\"\u0014X-\u00193\u0011\u0005%)g!B\u0001\u0003\u0011\u000317CA3h!\t\u0019\u0003.\u0003\u0002j)\t1\u0011I\\=SK\u001aDQaU3\u0005\u0002-$\u0012\u0001\u001a\u0004\u0006[\u0016\u0004!A\u001c\u0002\r\r\u0016$8\r\u001b*fcV,7\u000f^\n\u0004Y\u001e|\u0007C\u00019t\u001d\tI\u0011/\u0003\u0002s\u0005\u0005)\u0012IY:ue\u0006\u001cGOR3uG\",'\u000f\u00165sK\u0006$\u0017BA7u\u0015\t\u0011(\u0001\u0003\u0005wY\n\u0015\r\u0011\"\u0001x\u0003))h\u000eZ3sYfLgnZ\u000b\u0002qB\u0011\u0011p \b\u0003uvl\u0011a\u001f\u0006\u0003yv\n\u0001B]3rk\u0016\u001cHo]\u0005\u0003}n\fABR3uG\"\u0014V-];fgRLA!!\u0001\u0002\u0004\t9!)^5mI\u0016\u0014(B\u0001@|\u0011%\t9\u0001\u001cB\u0001B\u0003%\u00010A\u0006v]\u0012,'\u000f\\=j]\u001e\u0004\u0003BB*m\t\u0003\tY\u0001\u0006\u0003\u0002\u000e\u0005E\u0001cAA\bY6\tQ\r\u0003\u0004w\u0003\u0013\u0001\r\u0001\u001f\u0005\b\u0003+aG\u0011AA\f\u0003\u001dI7/R7qif,\"!!\u0007\u0011\u0007\r\nY\"C\u0002\u0002\u001eQ\u0011qAQ8pY\u0016\fg\u000eC\u0004\u0002\"1$\t!a\t\u0002\r=4gm]3u)\u0011\t)#a\u000b\u0011\u0007\r\n9#C\u0002\u0002*Q\u0011A\u0001T8oO\"A\u0011QFA\u0010\u0001\u0004\ty#\u0001\bu_BL7\rU1si&$\u0018n\u001c8\u0011\t\u0005E\u00121G\u0007\u0002{%\u0019\u0011QG\u001f\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]\u001a9\u0011\u0011H3\u0001\u0005\u0005m\"!\u0004)beRLG/[8o\t\u0006$\u0018mE\u0003\u00028\u001d\fi\u0004E\u0002q\u0003\u007fI1!!\u000fu\u0011)1\u0018q\u0007BC\u0002\u0013\u0005\u00111I\u000b\u0003\u0003\u000b\u0002B!a\u0012\u0002N9\u0019!0!\u0013\n\u0007\u0005-30A\u0007GKR\u001c\u0007NU3ta>t7/Z\u0005\u0005\u0003s\tyEC\u0002\u0002LmD1\"a\u0002\u00028\t\u0005\t\u0015!\u0003\u0002F!91+a\u000e\u0005\u0002\u0005UC\u0003BA,\u00033\u0002B!a\u0004\u00028!9a/a\u0015A\u0002\u0005\u0015\u0003\u0002CA/\u0003o!\t!a\u0018\u0002\u0013\u0015\u0014(o\u001c:D_\u0012,WCAA1!\r\u0019\u00131M\u0005\u0004\u0003K\"\"!B*i_J$\b\u0002CA5\u0003o!\t!a\u001b\u0002\u0013Q|'+Z2pe\u0012\u001cXCAA7!\u0011\ty'!\u001e\u000e\u0005\u0005E$bAA:{\u00051!/Z2pe\u0012LA!a\u001e\u0002r\tiQ*Z7pef\u0014VmY8sIND\u0001\"a\u001f\u00028\u0011\u0005\u0011QP\u0001\u000eQ&<\u0007nV1uKJl\u0017M]6\u0016\u0005\u0005\u0015\u0002\u0002CAA\u0003o!\t!a!\u0002\u0013\u0015D8-\u001a9uS>tWCAAC!\u0015\u0019\u0013qQAF\u0013\r\tI\t\u0006\u0002\u0007\u001fB$\u0018n\u001c8\u0011\t\u00055\u0015q\u0013\b\u0005\u0003\u001f\u000b\u0019JD\u0002\u0012\u0003#K\u0011!F\u0005\u0004\u0003+#\u0012a\u00029bG.\fw-Z\u0005\u0005\u00033\u000bYJA\u0005UQJ|w/\u00192mK*\u0019\u0011Q\u0013\u000b\u0006\r\u0005}\u0005\u0001AAQ\u0005\t\u0001F\tE\u0002b\u0003oA\u0011\"!*\u0001\u0005\u0004%I!a\u0018\u0002'\u0019,Go\u00195SKF,Xm\u001d;WKJ\u001c\u0018n\u001c8\t\u0011\u0005%\u0006\u0001)A\u0005\u0003C\nACZ3uG\"\u0014V-];fgR4VM]:j_:\u0004\u0003\"CAW\u0001\t\u0007I\u0011BAX\u00035\u0019xnY6fiRKW.Z8viV\t!\u0005C\u0004\u00024\u0002\u0001\u000b\u0011\u0002\u0012\u0002\u001dM|7m[3u)&lWm\\;uA!I\u0011q\u0017\u0001C\u0002\u0013%\u0011qV\u0001\ne\u0016\u0004H.[2b\u0013\u0012Dq!a/\u0001A\u0003%!%\u0001\u0006sKBd\u0017nY1JI\u0002B\u0011\"a0\u0001\u0005\u0004%I!!1\u0002\u000f5\f\u0007pV1jiV\u0011\u00111\u0019\t\u0005\u0003\u000b\fy-\u0004\u0002\u0002H*!\u0011\u0011ZAf\u0003\u0011a\u0017M\\4\u000b\u0005\u00055\u0017\u0001\u00026bm\u0006LA!!5\u0002H\n9\u0011J\u001c;fO\u0016\u0014\b\u0002CAk\u0001\u0001\u0006I!a1\u0002\u00115\f\u0007pV1ji\u0002B\u0011\"!7\u0001\u0005\u0004%I!!1\u0002\u00115LgNQ=uKND\u0001\"!8\u0001A\u0003%\u00111Y\u0001\n[&t')\u001f;fg\u0002B\u0011\"!9\u0001\u0005\u0004%I!!1\u0002\u00115\f\u0007PQ=uKND\u0001\"!:\u0001A\u0003%\u00111Y\u0001\n[\u0006D()\u001f;fg\u0002B\u0011\"!;\u0001\u0005\u0004%I!!1\u0002\u0013\u0019,Go\u00195TSj,\u0007\u0002CAw\u0001\u0001\u0006I!a1\u0002\u0015\u0019,Go\u00195TSj,\u0007\u0005C\u0004\u0002r\u0002!I!a=\u0002\u0011\rd\u0017.\u001a8u\u0013\u0012,\u0012A\u0004\u0005\n\u0003o\u0004!\u0019!C\u0005\u0003s\f!b]8ve\u000e,gj\u001c3f+\t\tY\u0010\u0005\u0003\u00022\u0005u\u0018bAA\u0000{\t!aj\u001c3f\u0011!\u0011\u0019\u0001\u0001Q\u0001\n\u0005m\u0018aC:pkJ\u001cWMT8eK\u0002B\u0011Ba\u0002\u0001\u0005\u0004%IA!\u0003\u0002\u001b9,Go^8sW\u000ec\u0017.\u001a8u+\t\u0011Y\u0001\u0005\u0003\u0003\u000e\tMQB\u0001B\b\u0015\r\u0011\tbP\u0001\bG2LWM\u001c;t\u0013\u0011\u0011)Ba\u0004\u0003\u001b9+Go^8sW\u000ec\u0017.\u001a8u\u0011!\u0011I\u0002\u0001Q\u0001\n\t-\u0011A\u00048fi^|'o[\"mS\u0016tG\u000f\t\u0005\b\u0005;\u0001A\u0011\tB\u0010\u0003!\u0019\b.\u001e;e_^tGC\u0001B\u0011!\r\u0019#1E\u0005\u0004\u0005K!\"\u0001B+oSRDqA!\u000b\u0001\t\u0003\u0011Y#\u0001\u000bqe>\u001cWm]:QCJ$\u0018\u000e^5p]\u0012\u000bG/\u0019\u000b\t\u0005C\u0011iCa\f\u00034!A\u0011Q\u0006B\u0014\u0001\u0004\ty\u0003\u0003\u0005\u00032\t\u001d\u0002\u0019AA\u0013\u0003-1W\r^2i\u001f\u001a47/\u001a;\t\u0011\tU\"q\u0005a\u0001\u0003C\u000bQ\u0002]1si&$\u0018n\u001c8ECR\f\u0007b\u0002B\u001d\u0001\u0011\u0005!1H\u0001\u001c[\u0006L(-Z,be:Lem\u0014<feNL'0\u001a3SK\u000e|'\u000fZ:\u0015\r\t\u0005\"Q\bB!\u0011!\u0011yDa\u000eA\u0002\u00055\u0014a\u0002:fG>\u0014Hm\u001d\u0005\t\u0003[\u00119\u00041\u0001\u00020!9!Q\t\u0001\u0005\u0002\t\u001d\u0013A\u00065b]\u0012dWm\u00144gg\u0016$x*\u001e;PMJ\u000bgnZ3\u0015\t\u0005\u0015\"\u0011\n\u0005\t\u0003[\u0011\u0019\u00051\u0001\u00020!9!Q\n\u0001\u0005\u0002\t=\u0013A\u00075b]\u0012dW\rU1si&$\u0018n\u001c8t/&$\b.\u0012:s_J\u001cH\u0003\u0002B\u0011\u0005#B\u0001Ba\u0015\u0003L\u0001\u0007!QK\u0001\u000ba\u0006\u0014H/\u001b;j_:\u001c\bCBAG\u0005/\ny#\u0003\u0003\u0003Z\u0005m%\u0001C%uKJ\f'\r\\3\t\u000f\tu\u0003\u0001\"\u0005\u0003`\u0005)a-\u001a;dQR!!\u0011\rB7!\u0019\tiIa\u0019\u0003h%!!QMAN\u0005\r\u0019V-\u001d\t\bG\t%\u0014qFAQ\u0013\r\u0011Y\u0007\u0006\u0002\u0007)V\u0004H.\u001a\u001a\t\u000f\t=$1\fa\u0001A\u0006aa-\u001a;dQJ+\u0017/^3ti\"9!1\u000f\u0001\u0005\n\tU\u0014aC:f]\u0012\u0014V-];fgR$BAa\u001e\u0003~A!!Q\u0002B=\u0013\u0011\u0011YHa\u0004\u0003\u001d\rc\u0017.\u001a8u%\u0016\u001c\bo\u001c8tK\"A!q\u0010B9\u0001\u0004\u0011\t)\u0001\bsKF,Xm\u001d;Ck&dG-\u001a:1\t\t\r%1\u0013\t\u0007\u0005\u000b\u0013YIa$\u000f\u0007i\u00149)C\u0002\u0003\nn\fq\"\u00112tiJ\f7\r\u001e*fcV,7\u000f^\u0005\u0005\u0003\u0003\u0011iIC\u0002\u0003\nn\u0004BA!%\u0003\u00142\u0001A\u0001\u0004BK\u0005{\n\t\u0011!A\u0003\u0002\t]%aA0%cE!!\u0011\u0014BP!\r\u0019#1T\u0005\u0004\u0005;#\"a\u0002(pi\"Lgn\u001a\t\u0004u\n\u0005\u0016b\u0001BRw\ny\u0011IY:ue\u0006\u001cGOU3rk\u0016\u001cH\u000fC\u0004\u0003(\u0002!IA!+\u0002-\u0015\f'\u000f\\5fgR|%\u000fT1uKN$xJ\u001a4tKR$\u0002\"!\n\u0003,\n5&\u0011\u0017\u0005\t\u0003[\u0011)\u000b1\u0001\u00020!A!q\u0016BS\u0001\u0004\t)#\u0001\tfCJd\u0017.Z:u\u001fJd\u0015\r^3ti\"9!1\u0017BS\u0001\u0004\u0011\u0013AC2p]N,X.\u001a:JI\"9!q\u0017\u0001\u0005\u0012\te\u0016!\u00052vS2$g)\u001a;dQJ+\u0017/^3tiR\u0019\u0001Ma/\t\u0011\tu&Q\u0017a\u0001\u0005\u007f\u000bA\u0002]1si&$\u0018n\u001c8NCB\u0004b!!$\u0003d\t\u0005\u0007cB\u0012\u0003j\u0005=\"1\u0019\t\u0004\u0013\t\u0015\u0017b\u0001Bd\u0005\t\u0019\u0002+\u0019:uSRLwN\u001c$fi\u000eD7\u000b^1uK\"9!1\u001a\u0001\u0005\n\t5\u0017AF:i_VdGMR8mY><XM\u001d+ie>$H\u000f\\3\u0015\r\u0005e!q\u001aBl\u0011\u001dq%\u0011\u001aa\u0001\u0005#\u00042!\u0003Bj\u0013\r\u0011)N\u0001\u0002\r%\u0016\u0004H.[2b#V|G/\u0019\u0005\t\u0003[\u0011I\r1\u0001\u00020\u0001")
public class ReplicaFetcherThread
extends AbstractFetcherThread {
    private final BrokerEndPoint sourceBroker;
    private final KafkaConfig brokerConfig;
    private final ReplicaManager replicaMgr;
    private final Time time;
    private final ReplicationQuotaManager quota;
    private final short fetchRequestVersion;
    private final int socketTimeout;
    private final int replicaId;
    private final Integer maxWait;
    private final Integer minBytes;
    private final Integer maxBytes;
    private final Integer fetchSize;
    private final Node sourceNode;
    private final NetworkClient networkClient;

    private short fetchRequestVersion() {
        return this.fetchRequestVersion;
    }

    private int socketTimeout() {
        return this.socketTimeout;
    }

    private int replicaId() {
        return this.replicaId;
    }

    private Integer maxWait() {
        return this.maxWait;
    }

    private Integer minBytes() {
        return this.minBytes;
    }

    private Integer maxBytes() {
        return this.maxBytes;
    }

    private Integer fetchSize() {
        return this.fetchSize;
    }

    private String clientId() {
        return super.name();
    }

    private Node sourceNode() {
        return this.sourceNode;
    }

    private NetworkClient networkClient() {
        return this.networkClient;
    }

    @Override
    public void shutdown() {
        super.shutdown();
        this.networkClient().close();
    }

    public void processPartitionData(TopicPartition topicPartition, long fetchOffset, PartitionData partitionData) {
        try {
            Replica replica = (Replica)this.replicaMgr.getReplica(topicPartition, this.replicaMgr.getReplica$default$2()).get();
            MemoryRecords records = partitionData.toRecords();
            this.maybeWarnIfOversizedRecords(records, topicPartition);
            if (fetchOffset != replica.logEndOffset().messageOffset()) {
                throw new RuntimeException(new StringOps(Predef$.MODULE$.augmentString("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicPartition, BoxesRunTime.boxToLong((long)fetchOffset), BoxesRunTime.boxToLong((long)replica.logEndOffset().messageOffset())})));
            }
            if (this.logger().isTraceEnabled()) {
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)replica.brokerId()), BoxesRunTime.boxToLong((long)replica.logEndOffset().messageOffset()), topicPartition, BoxesRunTime.boxToInteger((int)records.sizeInBytes()), BoxesRunTime.boxToLong((long)partitionData.highWatermark())})));
            }
            ((Log)replica.log().get()).append(records, false);
            if (this.logger().isTraceEnabled()) {
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)replica.brokerId()), BoxesRunTime.boxToLong((long)replica.logEndOffset().messageOffset()), BoxesRunTime.boxToInteger((int)records.sizeInBytes()), topicPartition})));
            }
            long followerHighWatermark = RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(replica.logEndOffset().messageOffset()), partitionData.highWatermark());
            replica.highWatermark_$eq(new LogOffsetMetadata(followerHighWatermark, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3()));
            if (this.logger().isTraceEnabled()) {
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Follower ", " set replica high watermark for partition ", " to ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)replica.brokerId()), topicPartition, BoxesRunTime.boxToLong((long)followerHighWatermark)})));
            }
            if (this.quota.isThrottled(topicPartition)) {
                this.quota.record(records.sizeInBytes());
            }
        }
        catch (KafkaStorageException e) {
            this.fatal((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Disk error while replicating data for ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicPartition})), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
            Runtime.getRuntime().halt(1);
        }
    }

    public void maybeWarnIfOversizedRecords(MemoryRecords records, TopicPartition topicPartition) {
        block0: {
            if (this.fetchRequestVersion() > 2 || records.sizeInBytes() <= 0 || records.validBytes() > 0) break block0;
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition ", ". "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicPartition})) + "This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " + "message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " + "equal or larger than your settings for max.message.bytes, both at a broker and topic level.");
        }
    }

    @Override
    public long handleOffsetOutOfRange(TopicPartition topicPartition) {
        long l;
        Replica replica = (Replica)this.replicaMgr.getReplica(topicPartition, this.replicaMgr.getReplica$default$2()).get();
        long leaderEndOffset = this.earliestOrLatestOffset(topicPartition, -1L, this.brokerConfig.brokerId());
        if (leaderEndOffset < replica.logEndOffset().messageOffset()) {
            if (!Predef$.MODULE$.Boolean2boolean(LogConfig$.MODULE$.fromProps(this.brokerConfig.originals(), AdminUtils$.MODULE$.fetchEntityConfig(this.replicaMgr.zkUtils(), ConfigType$.MODULE$.Topic(), topicPartition.topic())).uncleanLeaderElectionEnable())) {
                this.fatal((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Exiting because log truncation is not allowed for partition %s,")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topicPartition})) + new StringOps(Predef$.MODULE$.augmentString(" Current leader %d's latest offset %d is less than replica %d's latest offset %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)$this.sourceBroker.id()), BoxesRunTime.boxToLong((long)leaderEndOffset), BoxesRunTime.boxToInteger((int)$this.brokerConfig.brokerId()), BoxesRunTime.boxToLong((long)replica.logEndOffset().messageOffset())})));
                System.exit(1);
            }
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)$this.brokerConfig.brokerId()), topicPartition, BoxesRunTime.boxToLong((long)replica.logEndOffset().messageOffset()), BoxesRunTime.boxToInteger((int)$this.sourceBroker.id()), BoxesRunTime.boxToLong((long)leaderEndOffset)})));
            this.replicaMgr.logManager().truncateTo((Map<TopicPartition, Object>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicPartition), (Object)BoxesRunTime.boxToLong((long)leaderEndOffset))}))));
            l = leaderEndOffset;
        } else {
            long leaderStartOffset = this.earliestOrLatestOffset(topicPartition, -2L, this.brokerConfig.brokerId());
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)$this.brokerConfig.brokerId()), topicPartition, BoxesRunTime.boxToLong((long)replica.logEndOffset().messageOffset()), BoxesRunTime.boxToInteger((int)$this.sourceBroker.id()), BoxesRunTime.boxToLong((long)leaderStartOffset)})));
            long offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset().messageOffset());
            if (leaderStartOffset > replica.logEndOffset().messageOffset()) {
                this.replicaMgr.logManager().truncateFullyAndStartAt(topicPartition, leaderStartOffset);
            }
            l = offsetToFetch;
        }
        return l;
    }

    @Override
    public void handlePartitionsWithErrors(Iterable<TopicPartition> partitions) {
        this.delayPartitions(partitions, Predef$.MODULE$.Integer2int(this.brokerConfig.replicaFetchBackoffMs()));
    }

    public Seq<Tuple2<TopicPartition, PartitionData>> fetch(FetchRequest fetchRequest) {
        ClientResponse clientResponse = this.sendRequest((AbstractRequest.Builder<? extends AbstractRequest>)fetchRequest.underlying());
        FetchResponse fetchResponse = (FetchResponse)clientResponse.responseBody();
        return (Seq)((scala.collection.mutable.MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)fetchResponse.responseData()).asScala()).toSeq().map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            TopicPartition key = (TopicPartition)tuple2._1();
            FetchResponse.PartitionData value = (FetchResponse.PartitionData)tuple2._2();
            Tuple2 tuple22 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)key), (Object)new PartitionData(value));
            return tuple22;
        }, Seq$.MODULE$.canBuildFrom());
    }

    private ClientResponse sendRequest(AbstractRequest.Builder<? extends AbstractRequest> requestBuilder) {
        ClientResponse clientResponse;
        try {
            if (!NetworkClientBlockingOps$.MODULE$.blockingReady$extension(NetworkClientBlockingOps$.MODULE$.networkClientBlockingOps(this.networkClient()), this.sourceNode(), this.socketTimeout(), this.time)) {
                throw new SocketTimeoutException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Failed to connect within ", " ms"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.socketTimeout())})));
            }
            ClientRequest clientRequest = this.networkClient().newClientRequest(((Object)BoxesRunTime.boxToInteger((int)this.sourceBroker.id())).toString(), requestBuilder, this.time.milliseconds(), true);
            clientResponse = NetworkClientBlockingOps$.MODULE$.blockingSendAndReceive$extension(NetworkClientBlockingOps$.MODULE$.networkClientBlockingOps(this.networkClient()), clientRequest, this.time);
        }
        catch (Throwable e) {
            this.networkClient().close(((Object)BoxesRunTime.boxToInteger((int)this.sourceBroker.id())).toString());
            throw e;
        }
        return clientResponse;
    }

    private long earliestOrLatestOffset(TopicPartition topicPartition, long earliestOrLatest, int consumerId) {
        AbstractRequest.Builder builder;
        if (this.brokerConfig.interBrokerProtocolVersion().$greater$eq(KAFKA_0_10_1_IV2$.MODULE$)) {
            Map partitions = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicPartition), (Object)Predef$.MODULE$.long2Long(earliestOrLatest))}));
            builder = new ListOffsetRequest.Builder(consumerId).setTargetTimes((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(partitions).asJava()).setVersion((short)1);
        } else {
            Map partitions = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicPartition), (Object)new ListOffsetRequest.PartitionData(earliestOrLatest, 1))}));
            builder = new ListOffsetRequest.Builder(consumerId).setOffsetData((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(partitions).asJava()).setVersion((short)0);
        }
        AbstractRequest.Builder requestBuilder = builder;
        ClientResponse clientResponse = this.sendRequest((AbstractRequest.Builder<? extends AbstractRequest>)requestBuilder);
        ListOffsetResponse response = (ListOffsetResponse)clientResponse.responseBody();
        ListOffsetResponse.PartitionData partitionData = (ListOffsetResponse.PartitionData)response.responseData().get(topicPartition);
        Errors errors = Errors.forCode((short)partitionData.errorCode);
        if (!Errors.NONE.equals(errors)) {
            throw errors.exception();
        }
        long l = this.brokerConfig.interBrokerProtocolVersion().$greater$eq(KAFKA_0_10_1_IV2$.MODULE$) ? Predef$.MODULE$.Long2long(partitionData.offset) : Predef$.MODULE$.Long2long((Long)partitionData.offsets.get(0));
        return l;
    }

    @Override
    public FetchRequest buildFetchRequest(Seq<Tuple2<TopicPartition, PartitionFetchState>> partitionMap) {
        LinkedHashMap requestMap = new LinkedHashMap();
        partitionMap.foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
            Tuple2 tuple2 = x0$2;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            TopicPartition topicPartition = (TopicPartition)tuple2._1();
            PartitionFetchState partitionFetchState = (PartitionFetchState)tuple2._2();
            BoxedUnit boxedUnit = partitionFetchState.isActive() && !this.shouldFollowerThrottle($this.quota, topicPartition) ? requestMap.put(topicPartition, new FetchRequest.PartitionData(partitionFetchState.offset(), Predef$.MODULE$.Integer2int(this.fetchSize()))) : BoxedUnit.UNIT;
            return boxedUnit;
        });
        FetchRequest.Builder requestBuilder = new FetchRequest.Builder(Predef$.MODULE$.Integer2int(this.maxWait()), Predef$.MODULE$.Integer2int(this.minBytes()), requestMap).setReplicaId(this.replicaId()).setMaxBytes(Predef$.MODULE$.Integer2int(this.maxBytes()));
        requestBuilder.setVersion(this.fetchRequestVersion());
        return new FetchRequest(requestBuilder);
    }

    private boolean shouldFollowerThrottle(ReplicaQuota quota, TopicPartition topicPartition) {
        boolean isReplicaInSync = this.fetcherLagStats().isReplicaInSync(topicPartition.topic(), topicPartition.partition());
        return quota.isThrottled(topicPartition) && quota.isQuotaExceeded() && !isReplicaInSync;
    }

    public ReplicaFetcherThread(String name, int fetcherId, BrokerEndPoint sourceBroker, KafkaConfig brokerConfig, ReplicaManager replicaMgr, Metrics metrics, Time time, ReplicationQuotaManager quota) {
        this.sourceBroker = sourceBroker;
        this.brokerConfig = brokerConfig;
        this.replicaMgr = replicaMgr;
        this.time = time;
        this.quota = quota;
        super(name, name, sourceBroker, Predef$.MODULE$.Integer2int(brokerConfig.replicaFetchBackoffMs()), false);
        this.fetchRequestVersion = (short)(brokerConfig.interBrokerProtocolVersion().$greater$eq(KAFKA_0_10_1_IV1$.MODULE$) ? 3 : (brokerConfig.interBrokerProtocolVersion().$greater$eq(KAFKA_0_10_0_IV0$.MODULE$) ? 2 : (brokerConfig.interBrokerProtocolVersion().$greater$eq(KAFKA_0_9_0$.MODULE$) ? 1 : 0)));
        this.socketTimeout = Predef$.MODULE$.Integer2int(brokerConfig.replicaSocketTimeoutMs());
        this.replicaId = brokerConfig.brokerId();
        this.maxWait = brokerConfig.replicaFetchWaitMaxMs();
        this.minBytes = brokerConfig.replicaFetchMinBytes();
        this.maxBytes = brokerConfig.replicaFetchResponseMaxBytes();
        this.fetchSize = brokerConfig.replicaFetchMaxBytes();
        this.sourceNode = new Node(sourceBroker.id(), sourceBroker.host(), sourceBroker.port());
        ChannelBuilder channelBuilder = ChannelBuilders.clientChannelBuilder((SecurityProtocol)brokerConfig.interBrokerSecurityProtocol(), (LoginType)LoginType.SERVER, (java.util.Map)brokerConfig.values(), (String)brokerConfig.saslMechanismInterBrokerProtocol(), (boolean)brokerConfig.saslInterBrokerHandshakeRequestEnable());
        Selector selector = new Selector(-1, Predef$.MODULE$.Long2long(brokerConfig.connectionsMaxIdleMs()), metrics, time, "replica-fetcher", (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"broker-id"), (Object)((Object)BoxesRunTime.boxToInteger((int)sourceBroker.id())).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"fetcher-id"), (Object)((Object)BoxesRunTime.boxToInteger((int)fetcherId)).toString())}))).asJava(), false, channelBuilder);
        this.networkClient = new NetworkClient((Selectable)selector, (MetadataUpdater)new ManualMetadataUpdater(), this.clientId(), 1, 0L, -1, Predef$.MODULE$.Integer2int(brokerConfig.replicaSocketReceiveBufferBytes()), Predef$.MODULE$.Integer2int(brokerConfig.requestTimeoutMs()), time, false);
    }

    public static class FetchRequest
    implements AbstractFetcherThread.FetchRequest {
        private final FetchRequest.Builder underlying;

        public FetchRequest.Builder underlying() {
            return this.underlying;
        }

        @Override
        public boolean isEmpty() {
            return this.underlying().fetchData().isEmpty();
        }

        @Override
        public long offset(TopicPartition topicPartition) {
            return ((FetchRequest.PartitionData)((MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)this.underlying().fetchData()).asScala()).apply((Object)topicPartition)).offset;
        }

        public FetchRequest(FetchRequest.Builder underlying) {
            this.underlying = underlying;
        }
    }

    public static class PartitionData
    implements AbstractFetcherThread.PartitionData {
        private final FetchResponse.PartitionData underlying;

        public FetchResponse.PartitionData underlying() {
            return this.underlying;
        }

        @Override
        public short errorCode() {
            return this.underlying().errorCode;
        }

        @Override
        public MemoryRecords toRecords() {
            return (MemoryRecords)this.underlying().records;
        }

        @Override
        public long highWatermark() {
            return this.underlying().highWatermark;
        }

        @Override
        public Option<Throwable> exception() {
            Errors errors = Errors.forCode((short)this.errorCode());
            Object object = Errors.NONE.equals(errors) ? None$.MODULE$ : new Some((Object)errors.exception());
            return object;
        }

        public PartitionData(FetchResponse.PartitionData underlying) {
            this.underlying = underlying;
        }
    }
}

