/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.FetchResponse;
import kafka.api.FetchResponsePartitionData;
import kafka.api.PartitionFetchInfo;
import kafka.cluster.Broker;
import kafka.common.ClientIdAndBroker;
import kafka.common.ErrorMapping$;
import kafka.common.KafkaException;
import kafka.common.TopicAndPartition;
import kafka.consumer.PartitionTopicInfo$;
import kafka.consumer.SimpleConsumer;
import kafka.message.ByteBufferMessageSet;
import kafka.message.InvalidMessageException;
import kafka.message.MessageAndOffset;
import kafka.server.AbstractFetcherThread$;
import kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$;
import kafka.server.FetcherLagStats;
import kafka.server.FetcherStats;
import kafka.utils.ShutdownableThread;
import kafka.utils.Utils$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.ScalaObject;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.TraversableOnce;
import scala.collection.generic.Growable;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.HashSet;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@ScalaSignature(bytes="\u0006\u0001\t\u0005b!B\u0001\u0003\u0003\u00039!!F!cgR\u0014\u0018m\u0019;GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a\u0006\u0003\u0007\u0011\taa]3sm\u0016\u0014(\"A\u0003\u0002\u000b-\fgm[1\u0004\u0001M\u0019\u0001\u0001\u0003\b\u0011\u0005%aQ\"\u0001\u0006\u000b\u0005-!\u0011!B;uS2\u001c\u0018BA\u0007\u000b\u0005I\u0019\u0006.\u001e;e_^t\u0017M\u00197f)\"\u0014X-\u00193\u0011\u0005=\u0011R\"\u0001\t\u000b\u0003E\tQa]2bY\u0006L!a\u0005\t\u0003\u0017M\u001b\u0017\r\\1PE*,7\r\u001e\u0005\n+\u0001\u0011\t\u0011)A\u0005-u\tAA\\1nKB\u0011qC\u0007\b\u0003\u001faI!!\u0007\t\u0002\rA\u0013X\rZ3g\u0013\tYBD\u0001\u0004TiJLgn\u001a\u0006\u00033AI!!\u0006\u0007\t\u0011}\u0001!\u0011!Q\u0001\nY\t\u0001b\u00197jK:$\u0018\n\u001a\u0005\tC\u0001\u0011\t\u0011)A\u0005E\u0005a1o\\;sG\u0016\u0014%o\\6feB\u00111EJ\u0007\u0002I)\u0011Q\u0005B\u0001\bG2,8\u000f^3s\u0013\t9CE\u0001\u0004Ce>\\WM\u001d\u0005\tS\u0001\u0011\t\u0011)A\u0005U\u0005i1o\\2lKR$\u0016.\\3pkR\u0004\"aD\u0016\n\u00051\u0002\"aA%oi\"Aa\u0006\u0001B\u0001B\u0003%!&\u0001\tt_\u000e\\W\r\u001e\"vM\u001a,'oU5{K\"A\u0001\u0007\u0001B\u0001B\u0003%!&A\u0005gKR\u001c\u0007nU5{K\"A!\u0007\u0001B\u0001B\u0003%!&A\bgKR\u001c\u0007.\u001a:Ce>\\WM]%e\u0011!!\u0004A!A!\u0002\u0013Q\u0013aB7bq^\u000b\u0017\u000e\u001e\u0005\tm\u0001\u0011\t\u0011)A\u0005U\u0005AQ.\u001b8CsR,7\u000fC\u00059\u0001\t\u0005\t\u0015!\u0003:y\u0005y\u0011n]%oi\u0016\u0014(/\u001e9uS\ndW\r\u0005\u0002\u0010u%\u00111\b\u0005\u0002\b\u0005>|G.Z1o\u0013\tAD\u0002C\u0003?\u0001\u0011\u0005q(\u0001\u0004=S:LGO\u0010\u000b\f\u0001\n\u001bE)\u0012$H\u0011&S5\n\u0005\u0002B\u00015\t!\u0001C\u0003\u0016{\u0001\u0007a\u0003C\u0003 {\u0001\u0007a\u0003C\u0003\"{\u0001\u0007!\u0005C\u0003*{\u0001\u0007!\u0006C\u0003/{\u0001\u0007!\u0006C\u00031{\u0001\u0007!\u0006C\u00043{A\u0005\t\u0019\u0001\u0016\t\u000fQj\u0004\u0013!a\u0001U!9a'\u0010I\u0001\u0002\u0004Q\u0003b\u0002\u001d>!\u0003\u0005\r!\u000f\u0005\b\u001b\u0002\u0011\r\u0011\"\u0003O\u00031\u0001\u0018M\u001d;ji&|g.T1q+\u0005y\u0005\u0003\u0002)V/vk\u0011!\u0015\u0006\u0003%N\u000bq!\\;uC\ndWM\u0003\u0002U!\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005Y\u000b&a\u0002%bg\"l\u0015\r\u001d\t\u00031nk\u0011!\u0017\u0006\u00035\u0012\taaY8n[>t\u0017B\u0001/Z\u0005E!v\u000e]5d\u0003:$\u0007+\u0019:uSRLwN\u001c\t\u0003\u001fyK!a\u0018\t\u0003\t1{gn\u001a\u0005\u0007C\u0002\u0001\u000b\u0011B(\u0002\u001bA\f'\u000f^5uS>tW*\u00199!\u0011\u001d\u0019\u0007A1A\u0005\n\u0011\f\u0001\u0003]1si&$\u0018n\u001c8NCBdunY6\u0016\u0003\u0015\u0004\"AZ8\u000e\u0003\u001dT!\u0001[5\u0002\u000b1|7m[:\u000b\u0005)\\\u0017AC2p]\u000e,(O]3oi*\u0011A.\\\u0001\u0005kRLGNC\u0001o\u0003\u0011Q\u0017M^1\n\u0005A<'!\u0004*fK:$(/\u00198u\u0019>\u001c7\u000e\u0003\u0004s\u0001\u0001\u0006I!Z\u0001\u0012a\u0006\u0014H/\u001b;j_:l\u0015\r\u001d'pG.\u0004\u0003b\u0002;\u0001\u0005\u0004%I!^\u0001\u0011a\u0006\u0014H/\u001b;j_:l\u0015\r]\"p]\u0012,\u0012A\u001e\t\u0003M^L!\u0001_4\u0003\u0013\r{g\u000eZ5uS>t\u0007B\u0002>\u0001A\u0003%a/A\tqCJ$\u0018\u000e^5p]6\u000b\u0007oQ8oI\u0002Bq\u0001 \u0001C\u0002\u0013\u0005Q0\u0001\btS6\u0004H.Z\"p]N,X.\u001a:\u0016\u0003y\u00042a`A\u0003\u001b\t\t\tAC\u0002\u0002\u0004\u0011\t\u0001bY8ogVlWM]\u0005\u0005\u0003\u000f\t\tA\u0001\bTS6\u0004H.Z\"p]N,X.\u001a:\t\u000f\u0005-\u0001\u0001)A\u0005}\u0006y1/[7qY\u0016\u001cuN\\:v[\u0016\u0014\b\u0005C\u0005\u0002\u0010\u0001\u0011\r\u0011\"\u0003\u0002\u0012\u0005AQ.\u001a;sS\u000eLE-\u0006\u0002\u0002\u0014A\u0019\u0001,!\u0006\n\u0007\u0005]\u0011LA\tDY&,g\u000e^%e\u0003:$'I]8lKJD\u0001\"a\u0007\u0001A\u0003%\u00111C\u0001\n[\u0016$(/[2JI\u0002B\u0011\"a\b\u0001\u0005\u0004%\t!!\t\u0002\u0019\u0019,Go\u00195feN#\u0018\r^:\u0016\u0005\u0005\r\u0002cA!\u0002&%\u0019\u0011q\u0005\u0002\u0003\u0019\u0019+Go\u00195feN#\u0018\r^:\t\u0011\u0005-\u0002\u0001)A\u0005\u0003G\tQBZ3uG\",'o\u0015;biN\u0004\u0003\"CA\u0018\u0001\t\u0007I\u0011AA\u0019\u0003=1W\r^2iKJd\u0015mZ*uCR\u001cXCAA\u001a!\r\t\u0015QG\u0005\u0004\u0003o\u0011!a\u0004$fi\u000eDWM\u001d'bON#\u0018\r^:\t\u0011\u0005m\u0002\u0001)A\u0005\u0003g\t\u0001CZ3uG\",'\u000fT1h'R\fGo\u001d\u0011\t\u0013\u0005}\u0002A1A\u0005\u0002\u0005\u0005\u0013a\u00054fi\u000eD'+Z9vKN$()^5mI\u0016\u0014XCAA\"!\u0011\t)%a\u0013\u000e\u0005\u0005\u001d#bAA%\t\u0005\u0019\u0011\r]5\n\t\u00055\u0013q\t\u0002\u0014\r\u0016$8\r\u001b*fcV,7\u000f\u001e\"vS2$WM\u001d\u0005\t\u0003#\u0002\u0001\u0015!\u0003\u0002D\u0005!b-\u001a;dQJ+\u0017/^3ti\n+\u0018\u000e\u001c3fe\u0002Bq!!\u0016\u0001\r\u0003\t9&\u0001\u000bqe>\u001cWm]:QCJ$\u0018\u000e^5p]\u0012\u000bG/\u0019\u000b\t\u00033\ny&a\u0019\u0002hA\u0019q\"a\u0017\n\u0007\u0005u\u0003C\u0001\u0003V]&$\bbBA1\u0003'\u0002\raV\u0001\u0012i>\u0004\u0018nY!oIB\u000b'\u000f^5uS>t\u0007bBA3\u0003'\u0002\r!X\u0001\fM\u0016$8\r[(gMN,G\u000f\u0003\u0005\u0002j\u0005M\u0003\u0019AA6\u00035\u0001\u0018M\u001d;ji&|g\u000eR1uCB!\u0011QIA7\u0013\u0011\ty'a\u0012\u00035\u0019+Go\u00195SKN\u0004xN\\:f!\u0006\u0014H/\u001b;j_:$\u0015\r^1\t\u000f\u0005M\u0004A\"\u0001\u0002v\u00051\u0002.\u00198eY\u0016|eMZ:fi>+Ho\u00144SC:<W\rF\u0002^\u0003oBq!!\u0019\u0002r\u0001\u0007q\u000bC\u0004\u0002|\u00011\t!! \u00025!\fg\u000e\u001a7f!\u0006\u0014H/\u001b;j_:\u001cx+\u001b;i\u000bJ\u0014xN]:\u0015\t\u0005e\u0013q\u0010\u0005\t\u0003\u0003\u000bI\b1\u0001\u0002\u0004\u0006Q\u0001/\u0019:uSRLwN\\:\u0011\u000b\u0005\u0015\u0015QS,\u000f\t\u0005\u001d\u0015\u0011\u0013\b\u0005\u0003\u0013\u000by)\u0004\u0002\u0002\f*\u0019\u0011Q\u0012\u0004\u0002\rq\u0012xn\u001c;?\u0013\u0005\t\u0012bAAJ!\u00059\u0001/Y2lC\u001e,\u0017\u0002BAL\u00033\u0013\u0001\"\u0013;fe\u0006\u0014G.\u001a\u0006\u0004\u0003'\u0003\u0002bBAO\u0001\u0011\u0005\u0013qT\u0001\tg\",H\u000fZ8x]R\u0011\u0011\u0011\f\u0005\b\u0003G\u0003A\u0011IAP\u0003\u0019!wnV8sW\"9\u0011q\u0015\u0001\u0005\n\u0005%\u0016a\u00059s_\u000e,7o\u001d$fi\u000eD'+Z9vKN$H\u0003BA-\u0003WC\u0001\"!,\u0002&\u0002\u0007\u0011qV\u0001\rM\u0016$8\r\u001b*fcV,7\u000f\u001e\t\u0005\u0003\u000b\n\t,\u0003\u0003\u00024\u0006\u001d#\u0001\u0004$fi\u000eD'+Z9vKN$\bbBA\\\u0001\u0011\u0005\u0011\u0011X\u0001\u000eC\u0012$\u0007+\u0019:uSRLwN\\:\u0015\t\u0005e\u00131\u0018\u0005\t\u0003{\u000b)\f1\u0001\u0002@\u0006\u0019\u0002/\u0019:uSRLwN\\!oI>3gm]3ugB1\u0011\u0011YAb/vk\u0011aU\u0005\u0004\u0003\u000b\u001c&aA'ba\"9\u0011\u0011\u001a\u0001\u0005\u0002\u0005-\u0017\u0001\u0005:f[>4X\rU1si&$\u0018n\u001c8t)\u0011\tI&!4\t\u0011\u0005=\u0017q\u0019a\u0001\u0003#\f!\u0003^8qS\u000e\fe\u000e\u001a)beRLG/[8ogB)\u0011\u0011YAj/&\u0019\u0011Q[*\u0003\u0007M+G\u000fC\u0004\u0002Z\u0002!\t!a7\u0002\u001dA\f'\u000f^5uS>t7i\\;oiR\t!fB\u0005\u0002`\n\t\t\u0011#\u0002\u0002b\u0006)\u0012IY:ue\u0006\u001cGOR3uG\",'\u000f\u00165sK\u0006$\u0007cA!\u0002d\u001aA\u0011AAA\u0001\u0012\u000b\t)oE\u0003\u0002d\u0006\u001dh\u0002\u0005\u0003\u0002j\u0006=XBAAv\u0015\r\ti/\\\u0001\u0005Y\u0006tw-\u0003\u0003\u0002r\u0006-(AB(cU\u0016\u001cG\u000fC\u0004?\u0003G$\t!!>\u0015\u0005\u0005\u0005\bBCA}\u0003G\f\n\u0011\"\u0001\u0002|\u0006q\u0011N\\5uI\u0011,g-Y;mi\u0012:TCAA\u007fU\rQ\u0013q`\u0016\u0003\u0005\u0003\u0001BAa\u0001\u0003\u000e5\u0011!Q\u0001\u0006\u0005\u0005\u000f\u0011I!A\u0005v]\u000eDWmY6fI*\u0019!1\u0002\t\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0003\u0010\t\u0015!!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\"Q!1CAr#\u0003%\t!a?\u0002\u001d%t\u0017\u000e\u001e\u0013eK\u001a\fW\u000f\u001c;%q!Q!qCAr#\u0003%\t!a?\u0002\u001d%t\u0017\u000e\u001e\u0013eK\u001a\fW\u000f\u001c;%s!Q!1DAr#\u0003%\tA!\b\u0002\u001f%t\u0017\u000e\u001e\u0013eK\u001a\fW\u000f\u001c;%cA*\"Aa\b+\u0007e\ny\u0010")
public abstract class AbstractFetcherThread
extends ShutdownableThread
implements ScalaObject {
    public final Broker kafka$server$AbstractFetcherThread$$sourceBroker;
    public final int kafka$server$AbstractFetcherThread$$fetchSize;
    private final HashMap<TopicAndPartition, Object> kafka$server$AbstractFetcherThread$$partitionMap;
    private final ReentrantLock partitionMapLock;
    private final Condition kafka$server$AbstractFetcherThread$$partitionMapCond;
    private final SimpleConsumer simpleConsumer;
    private final ClientIdAndBroker metricId;
    private final FetcherStats fetcherStats;
    private final FetcherLagStats fetcherLagStats;
    private final FetchRequestBuilder fetchRequestBuilder;

    public static final boolean init$default$10() {
        return AbstractFetcherThread$.MODULE$.init$default$10();
    }

    public static final int init$default$9() {
        return AbstractFetcherThread$.MODULE$.init$default$9();
    }

    public static final int init$default$8() {
        return AbstractFetcherThread$.MODULE$.init$default$8();
    }

    public static final int init$default$7() {
        return AbstractFetcherThread$.MODULE$.init$default$7();
    }

    public final HashMap<TopicAndPartition, Object> kafka$server$AbstractFetcherThread$$partitionMap() {
        return this.kafka$server$AbstractFetcherThread$$partitionMap;
    }

    private ReentrantLock partitionMapLock() {
        return this.partitionMapLock;
    }

    public final Condition kafka$server$AbstractFetcherThread$$partitionMapCond() {
        return this.kafka$server$AbstractFetcherThread$$partitionMapCond;
    }

    public SimpleConsumer simpleConsumer() {
        return this.simpleConsumer;
    }

    private ClientIdAndBroker metricId() {
        return this.metricId;
    }

    public FetcherStats fetcherStats() {
        return this.fetcherStats;
    }

    public FetcherLagStats fetcherLagStats() {
        return this.fetcherLagStats;
    }

    public FetchRequestBuilder fetchRequestBuilder() {
        return this.fetchRequestBuilder;
    }

    public abstract void processPartitionData(TopicAndPartition var1, long var2, FetchResponsePartitionData var4);

    public abstract long handleOffsetOutOfRange(TopicAndPartition var1);

    public abstract void handlePartitionsWithErrors(Iterable<TopicAndPartition> var1);

    @Override
    public void shutdown() {
        super.shutdown();
        this.simpleConsumer().close();
    }

    @Override
    public void doWork() {
        Utils$.MODULE$.inLock(this.partitionMapLock(), new Serializable(this){
            public static final long serialVersionUID;
            private final AbstractFetcherThread $outer;

            static {
                long l = serialVersionUID = 0L;
            }

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                Object object = this.$outer.kafka$server$AbstractFetcherThread$$partitionMap().isEmpty() ? BoxesRunTime.boxToBoolean((boolean)this.$outer.kafka$server$AbstractFetcherThread$$partitionMapCond().await(200L, TimeUnit.MILLISECONDS)) : BoxedUnit.UNIT;
                this.$outer.kafka$server$AbstractFetcherThread$$partitionMap().foreach((Function1)new Serializable(this){
                    public static final long serialVersionUID;
                    private final $anonfun$doWork$1 $outer;

                    static {
                        long l = serialVersionUID = 0L;
                    }

                    public final FetchRequestBuilder apply(Tuple2<TopicAndPartition, Object> tuple2) {
                        Tuple2<TopicAndPartition, Object> tuple22 = tuple2;
                        if (tuple22 != null) {
                            TopicAndPartition topicAndPartition;
                            TopicAndPartition topicAndPartition2 = topicAndPartition = (TopicAndPartition)tuple22._1();
                            return this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$$outer().fetchRequestBuilder().addFetch(topicAndPartition2.topic(), topicAndPartition2.partition(), BoxesRunTime.unboxToLong((Object)tuple22._2()), this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$$outer().kafka$server$AbstractFetcherThread$$fetchSize);
                        }
                        throw new MatchError(tuple22);
                    }
                    {
                        if ($outer == null) {
                            throw new NullPointerException();
                        }
                        this.$outer = $outer;
                    }
                });
            }

            public AbstractFetcherThread kafka$server$AbstractFetcherThread$$anonfun$$$outer() {
                return this.$outer;
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
        FetchRequest fetchRequest = this.fetchRequestBuilder().build();
        if (!fetchRequest.requestInfo().isEmpty()) {
            this.processFetchRequest(fetchRequest);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void processFetchRequest(FetchRequest fetchRequest$1) {
        HashSet partitionsWithError$1 = new HashSet();
        ObjectRef response$1 = new ObjectRef(null);
        BoxedUnit exceptionResult1 = null;
        try {
            this.trace((Function0<String>)new Serializable(this, fetchRequest$1){
                public static final long serialVersionUID;
                private final AbstractFetcherThread $outer;
                private final FetchRequest fetchRequest$1;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final String apply() {
                    return Predef$.MODULE$.augmentString("Issuing to broker %d of fetch request %s").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.kafka$server$AbstractFetcherThread$$sourceBroker.id()), this.fetchRequest$1}));
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                    this.fetchRequest$1 = fetchRequest;
                }
            });
            response$1.elem = this.simpleConsumer().fetch(fetchRequest$1);
            exceptionResult1 = BoxedUnit.UNIT;
        }
        catch (Throwable throwable) {
            BoxedUnit boxedUnit;
            if (this.isRunning().get()) {
                this.warn((Function0<String>)new Serializable(this, fetchRequest$1, throwable){
                    public static final long serialVersionUID;
                    private final FetchRequest fetchRequest$1;
                    private final Throwable t$1;

                    static {
                        long l = serialVersionUID = 0L;
                    }

                    public final String apply() {
                        return Predef$.MODULE$.augmentString("Error in fetch %s. Possible cause: %s").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.fetchRequest$1, this.t$1.toString()}));
                    }
                    {
                        this.fetchRequest$1 = fetchRequest;
                        this.t$1 = throwable;
                    }
                });
                ReentrantLock reentrantLock = this.partitionMapLock();
                synchronized (reentrantLock) {
                    Growable growable = partitionsWithError$1.$plus$plus$eq((TraversableOnce)this.kafka$server$AbstractFetcherThread$$partitionMap().keys());
                    // MONITOREXIT @DISABLED, blocks:[1, 2, 3, 7] lbl17 : MonitorExitStatement: MONITOREXIT : var6_6
                    boxedUnit = growable;
                }
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            exceptionResult1 = boxedUnit;
        }
        this.fetcherStats().requestRate().mark();
        Object object = (FetchResponse)response$1.elem == null ? BoxedUnit.UNIT : Utils$.MODULE$.inLock(this.partitionMapLock(), new Serializable(this, fetchRequest$1, partitionsWithError$1, response$1){
            public static final long serialVersionUID;
            private final AbstractFetcherThread $outer;
            public final FetchRequest fetchRequest$1;
            public final HashSet partitionsWithError$1;
            private final ObjectRef response$1;

            static {
                long l = serialVersionUID = 0L;
            }

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                ((FetchResponse)this.response$1.elem).data().foreach((Function1)new Serializable(this){
                    public static final long serialVersionUID;
                    private final $anonfun$processFetchRequest$1 $outer;

                    static {
                        long l = serialVersionUID = 0L;
                    }

                    /*
                     * Loose catch block
                     * Enabled force condition propagation
                     * Lifted jumps to return sites
                     */
                    public final Object apply(Tuple2<TopicAndPartition, FetchResponsePartitionData> tuple2) {
                        BoxedUnit boxedUnit;
                        HashSet exceptionResult4;
                        BoxedUnit exceptionResult3;
                        Tuple2<TopicAndPartition, FetchResponsePartitionData> tuple22 = tuple2;
                        if (tuple22 == null) throw new MatchError(tuple22);
                        TopicAndPartition topicAndPartition = (TopicAndPartition)tuple22._1();
                        FetchResponsePartitionData fetchResponsePartitionData = (FetchResponsePartitionData)tuple22._2();
                        TopicAndPartition topicAndPartition2 = topicAndPartition;
                        FetchResponsePartitionData partitionData$1 = fetchResponsePartitionData;
                        Tuple2<String, Object> tuple23 = topicAndPartition2.asTuple();
                        if (tuple23 == null) throw new MatchError(tuple23);
                        Tuple2 tuple24 = new Tuple2(tuple23._1(), tuple23._2());
                        String topic$2 = (String)tuple24._1();
                        int partitionId$2 = tuple24._2$mcI$sp();
                        Option currentOffset$1 = this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$$outer().kafka$server$AbstractFetcherThread$$partitionMap().get((Object)topicAndPartition2);
                        if (currentOffset$1.isDefined() && ((PartitionFetchInfo)this.$outer.fetchRequest$1.requestInfo().apply((Object)topicAndPartition2)).offset() == BoxesRunTime.unboxToLong((Object)currentOffset$1.get())) {
                            short s = partitionData$1.error();
                            if (BoxesRunTime.equals((Object)BoxesRunTime.boxToShort((short)ErrorMapping$.MODULE$.NoError()), (Object)BoxesRunTime.boxToShort((short)s))) {
                                long l;
                                exceptionResult3 = null;
                                ByteBufferMessageSet messages = (ByteBufferMessageSet)partitionData$1.messages();
                                int validBytes = messages.validBytes();
                                Option option = messages.shallowIterator().toSeq().lastOption();
                                if (option instanceof Some) {
                                    Some some = (Some)option;
                                    MessageAndOffset messageAndOffset = (MessageAndOffset)some.x();
                                    if (messageAndOffset == null) throw new MatchError((Object)option);
                                    l = messageAndOffset.nextOffset();
                                } else {
                                    None$ none$ = None$.MODULE$;
                                    Option option2 = option;
                                    if (none$ != null ? !none$.equals(option2) : option2 != null) throw new MatchError((Object)option);
                                    l = BoxesRunTime.unboxToLong((Object)currentOffset$1.get());
                                }
                                long newOffset = l;
                                this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$$outer().kafka$server$AbstractFetcherThread$$partitionMap().put((Object)topicAndPartition2, (Object)BoxesRunTime.boxToLong((long)newOffset));
                                this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$$outer().fetcherLagStats().getFetcherLagStats(topic$2, partitionId$2).lag_$eq(partitionData$1.hw() - newOffset);
                                this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$$outer().fetcherStats().byteRate().mark((long)validBytes);
                                this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$$outer().processPartitionData(topicAndPartition2, BoxesRunTime.unboxToLong((Object)currentOffset$1.get()), partitionData$1);
                                exceptionResult3 = BoxedUnit.UNIT;
                            }
                            if (BoxesRunTime.equals((Object)BoxesRunTime.boxToShort((short)ErrorMapping$.MODULE$.OffsetOutOfRangeCode()), (Object)BoxesRunTime.boxToShort((short)s))) {
                                exceptionResult4 = null;
                                long newOffset$1 = this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$$outer().handleOffsetOutOfRange(topicAndPartition2);
                                this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$$outer().kafka$server$AbstractFetcherThread$$partitionMap().put((Object)topicAndPartition2, (Object)BoxesRunTime.boxToLong((long)newOffset$1));
                                this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$$outer().error((Function0<String>)new Serializable(this, topic$2, partitionId$2, currentOffset$1, newOffset$1){
                                    public static final long serialVersionUID;
                                    private final String topic$2;
                                    private final int partitionId$2;
                                    private final Option currentOffset$1;
                                    private final long newOffset$1;

                                    static {
                                        long l = serialVersionUID = 0L;
                                    }

                                    public final String apply() {
                                        return Predef$.MODULE$.augmentString("Current offset %d for partition [%s,%d] out of range; reset offset to %d").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.currentOffset$1.get(), this.topic$2, BoxesRunTime.boxToInteger((int)this.partitionId$2), BoxesRunTime.boxToLong((long)this.newOffset$1)}));
                                    }
                                    {
                                        this.topic$2 = string;
                                        this.partitionId$2 = n;
                                        this.currentOffset$1 = option;
                                        this.newOffset$1 = l;
                                    }
                                });
                                exceptionResult4 = BoxedUnit.UNIT;
                            }
                            if (this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$$outer().isRunning().get()) {
                                this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$$outer().error((Function0<String>)new Serializable(this, partitionData$1, topic$2, partitionId$2){
                                    public static final long serialVersionUID;
                                    private final $anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2 $outer;
                                    private final FetchResponsePartitionData partitionData$1;
                                    private final String topic$2;
                                    private final int partitionId$2;

                                    static {
                                        long l = serialVersionUID = 0L;
                                    }

                                    public final String apply() {
                                        return Predef$.MODULE$.augmentString("Error for partition [%s,%d] to broker %d:%s").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.topic$2, BoxesRunTime.boxToInteger((int)this.partitionId$2), BoxesRunTime.boxToInteger((int)this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$anonfun$$$outer().kafka$server$AbstractFetcherThread$$anonfun$$$outer().kafka$server$AbstractFetcherThread$$sourceBroker.id()), ErrorMapping$.MODULE$.exceptionFor(this.partitionData$1.error()).getClass()}));
                                    }
                                    {
                                        if ($outer == null) {
                                            throw new NullPointerException();
                                        }
                                        this.$outer = $outer;
                                        this.partitionData$1 = fetchResponsePartitionData;
                                        this.topic$2 = string;
                                        this.partitionId$2 = n;
                                    }
                                });
                                boxedUnit = this.$outer.partitionsWithError$1.$plus$eq((Object)topicAndPartition2);
                                return boxedUnit;
                            } else {
                                boxedUnit = BoxedUnit.UNIT;
                            }
                            return boxedUnit;
                        }
                        boxedUnit = BoxedUnit.UNIT;
                        return boxedUnit;
                        catch (Throwable throwable) {
                            throw new KafkaException(Predef$.MODULE$.augmentString("error processing data for partition [%s,%d] offset %d").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topic$2, BoxesRunTime.boxToInteger((int)partitionId$2), currentOffset$1.get()})), throwable);
                        }
                        catch (InvalidMessageException invalidMessageException) {
                            this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$$outer().logger().error((Object)new StringBuilder().append((Object)"Found invalid messages during fetch for partition [").append((Object)topic$2).append((Object)",").append((Object)BoxesRunTime.boxToInteger((int)partitionId$2)).append((Object)"] offset ").append(currentOffset$1.get()).append((Object)" error ").append((Object)invalidMessageException.getMessage()).toString());
                            exceptionResult3 = BoxedUnit.UNIT;
                        }
                        boxedUnit = exceptionResult3;
                        return boxedUnit;
                        catch (Throwable throwable) {
                            this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$$outer().error((Function0<String>)new Serializable(this, topic$2, partitionId$2){
                                public static final long serialVersionUID;
                                private final $anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2 $outer;
                                private final String topic$2;
                                private final int partitionId$2;

                                static {
                                    long l = serialVersionUID = 0L;
                                }

                                public final String apply() {
                                    return Predef$.MODULE$.augmentString("Error getting offset for partition [%s,%d] to broker %d").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.topic$2, BoxesRunTime.boxToInteger((int)this.partitionId$2), BoxesRunTime.boxToInteger((int)this.$outer.kafka$server$AbstractFetcherThread$$anonfun$$anonfun$$$outer().kafka$server$AbstractFetcherThread$$anonfun$$$outer().kafka$server$AbstractFetcherThread$$sourceBroker.id())}));
                                }
                                {
                                    if ($outer == null) {
                                        throw new NullPointerException();
                                    }
                                    this.$outer = $outer;
                                    this.topic$2 = string;
                                    this.partitionId$2 = n;
                                }
                            }, (Function0<Throwable>)new Serializable(this, throwable){
                                public static final long serialVersionUID;
                                private final Throwable e$1;

                                static {
                                    long l = serialVersionUID = 0L;
                                }

                                public final Throwable apply() {
                                    return this.e$1;
                                }
                                {
                                    this.e$1 = throwable;
                                }
                            });
                            exceptionResult4 = this.$outer.partitionsWithError$1.$plus$eq((Object)topicAndPartition2);
                        }
                        boxedUnit = exceptionResult4;
                        return boxedUnit;
                    }

                    public $anonfun$processFetchRequest$1 kafka$server$AbstractFetcherThread$$anonfun$$anonfun$$$outer() {
                        return this.$outer;
                    }
                    {
                        if ($outer == null) {
                            throw new NullPointerException();
                        }
                        this.$outer = $outer;
                    }
                });
            }

            public AbstractFetcherThread kafka$server$AbstractFetcherThread$$anonfun$$$outer() {
                return this.$outer;
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.fetchRequest$1 = fetchRequest;
                this.partitionsWithError$1 = hashSet;
                this.response$1 = objectRef;
            }
        });
        if (partitionsWithError$1.size() > 0) {
            this.debug((Function0<String>)new Serializable(this, partitionsWithError$1){
                public static final long serialVersionUID;
                private final HashSet partitionsWithError$1;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final String apply() {
                    return Predef$.MODULE$.augmentString("handling partitions with error for %s").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.partitionsWithError$1}));
                }
                {
                    this.partitionsWithError$1 = hashSet;
                }
            });
            this.handlePartitionsWithErrors((Iterable<TopicAndPartition>)partitionsWithError$1);
        }
    }

    public void addPartitions(Map<TopicAndPartition, Object> partitionAndOffsets) {
        this.partitionMapLock().lockInterruptibly();
        try {
            ((IterableLike)partitionAndOffsets.filter((Function1)new Serializable(this){
                public static final long serialVersionUID;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final boolean apply(Tuple2<TopicAndPartition, Object> tuple2) {
                    Tuple2<TopicAndPartition, Object> tuple22 = tuple2;
                    return tuple22 != null;
                }
            })).foreach((Function1)new Serializable(this){
                public static final long serialVersionUID;
                private final AbstractFetcherThread $outer;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final Object apply(Tuple2<TopicAndPartition, Object> tuple2) {
                    Tuple2<TopicAndPartition, Object> tuple22 = tuple2;
                    if (tuple22 != null) {
                        TopicAndPartition topicAndPartition = (TopicAndPartition)tuple22._1();
                        long l = BoxesRunTime.unboxToLong((Object)tuple22._2());
                        TopicAndPartition topicAndPartition2 = topicAndPartition;
                        long offset = l;
                        return this.$outer.kafka$server$AbstractFetcherThread$$partitionMap().contains((Object)topicAndPartition2) ? BoxedUnit.UNIT : this.$outer.kafka$server$AbstractFetcherThread$$partitionMap().put((Object)topicAndPartition2, (Object)(PartitionTopicInfo$.MODULE$.isOffsetInvalid(offset) ? BoxesRunTime.boxToLong((long)this.$outer.handleOffsetOutOfRange(topicAndPartition2)) : BoxesRunTime.boxToLong((long)offset)));
                    }
                    throw new MatchError(tuple22);
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                }
            });
            this.kafka$server$AbstractFetcherThread$$partitionMapCond().signalAll();
            return;
        }
        finally {
            this.partitionMapLock().unlock();
        }
    }

    public void removePartitions(Set<TopicAndPartition> topicAndPartitions) {
        this.partitionMapLock().lockInterruptibly();
        try {
            topicAndPartitions.foreach((Function1)new Serializable(this){
                public static final long serialVersionUID;
                private final AbstractFetcherThread $outer;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final Option<Object> apply(TopicAndPartition tp) {
                    return this.$outer.kafka$server$AbstractFetcherThread$$partitionMap().remove((Object)tp);
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                }
            });
            return;
        }
        finally {
            this.partitionMapLock().unlock();
        }
    }

    public int partitionCount() {
        this.partitionMapLock().lockInterruptibly();
        int exceptionResult2 = 0;
        try {
            exceptionResult2 = this.kafka$server$AbstractFetcherThread$$partitionMap().size();
            return exceptionResult2;
        }
        finally {
            this.partitionMapLock().unlock();
        }
    }

    public AbstractFetcherThread(String name, String clientId, Broker sourceBroker, int socketTimeout, int socketBufferSize, int fetchSize, int fetcherBrokerId, int maxWait, int minBytes, boolean isInterruptible) {
        this.kafka$server$AbstractFetcherThread$$sourceBroker = sourceBroker;
        this.kafka$server$AbstractFetcherThread$$fetchSize = fetchSize;
        super(name, isInterruptible);
        this.kafka$server$AbstractFetcherThread$$partitionMap = new HashMap();
        this.partitionMapLock = new ReentrantLock();
        this.kafka$server$AbstractFetcherThread$$partitionMapCond = this.partitionMapLock().newCondition();
        this.simpleConsumer = new SimpleConsumer(sourceBroker.host(), sourceBroker.port(), socketTimeout, socketBufferSize, clientId);
        this.metricId = new ClientIdAndBroker(clientId, sourceBroker.host(), sourceBroker.port());
        this.fetcherStats = new FetcherStats(this.metricId());
        this.fetcherLagStats = new FetcherLagStats(this.metricId());
        this.fetchRequestBuilder = new FetchRequestBuilder().clientId(clientId).replicaId(fetcherBrokerId).maxWait(maxWait).minBytes(minBytes);
    }
}

