/*
 * Decompiled with CFR 0.152.
 */
package kafka.controller;

import java.util.concurrent.BlockingQueue;
import kafka.api.LeaderAndIsrResponse$;
import kafka.api.RequestKeys$;
import kafka.api.RequestOrResponse;
import kafka.api.StopReplicaResponse$;
import kafka.api.UpdateMetadataResponse$;
import kafka.cluster.Broker;
import kafka.controller.ControllerContext;
import kafka.controller.KafkaController;
import kafka.controller.KafkaController$;
import kafka.network.BlockingChannel;
import kafka.network.Receive;
import kafka.utils.ShutdownableThread;
import kafka.utils.ShutdownableThread$;
import kafka.utils.Utils$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.ScalaObject;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Seq;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@ScalaSignature(bytes="\u0006\u0001\u0005\u0015a\u0001B\u0001\u0003\u0001\u001d\u0011\u0011CU3rk\u0016\u001cHoU3oIRC'/Z1e\u0015\t\u0019A!\u0001\u0006d_:$(o\u001c7mKJT\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\r\u0001\u0001B\u0004\t\u0003\u00131i\u0011A\u0003\u0006\u0003\u0017\u0011\tQ!\u001e;jYNL!!\u0004\u0006\u0003%MCW\u000f\u001e3po:\f'\r\\3UQJ,\u0017\r\u001a\t\u0003\u001fIi\u0011\u0001\u0005\u0006\u0002#\u0005)1oY1mC&\u00111\u0003\u0005\u0002\f'\u000e\fG.Y(cU\u0016\u001cG\u000f\u0003\u0005\u0016\u0001\t\u0015\r\u0011\"\u0001\u0017\u00031\u0019wN\u001c;s_2dWM]%e+\u00059\u0002CA\b\u0019\u0013\tI\u0002CA\u0002J]RD\u0001b\u0007\u0001\u0003\u0002\u0003\u0006IaF\u0001\u000eG>tGO]8mY\u0016\u0014\u0018\n\u001a\u0011\t\u0011u\u0001!Q1A\u0005\u0002y\t\u0011cY8oiJ|G\u000e\\3s\u0007>tG/\u001a=u+\u0005y\u0002C\u0001\u0011\"\u001b\u0005\u0011\u0011B\u0001\u0012\u0003\u0005E\u0019uN\u001c;s_2dWM]\"p]R,\u0007\u0010\u001e\u0005\tI\u0001\u0011\t\u0011)A\u0005?\u0005\u00112m\u001c8ue>dG.\u001a:D_:$X\r\u001f;!\u0011!1\u0003A!b\u0001\n\u00039\u0013\u0001\u0003;p\u0005J|7.\u001a:\u0016\u0003!\u0002\"!\u000b\u0017\u000e\u0003)R!a\u000b\u0003\u0002\u000f\rdWo\u001d;fe&\u0011QF\u000b\u0002\u0007\u0005J|7.\u001a:\t\u0011=\u0002!\u0011!Q\u0001\n!\n\u0011\u0002^8Ce>\\WM\u001d\u0011\t\u0011E\u0002!Q1A\u0005\u0002I\nQ!];fk\u0016,\u0012a\r\t\u0004imjT\"A\u001b\u000b\u0005Y:\u0014AC2p]\u000e,(O]3oi*\u0011\u0001(O\u0001\u0005kRLGNC\u0001;\u0003\u0011Q\u0017M^1\n\u0005q*$!\u0004\"m_\u000e\\\u0017N\\4Rk\u0016,X\r\u0005\u0003\u0010}\u00013\u0015BA \u0011\u0005\u0019!V\u000f\u001d7feA\u0011\u0011\tR\u0007\u0002\u0005*\u00111\tB\u0001\u0004CBL\u0017BA#C\u0005E\u0011V-];fgR|%OU3ta>t7/\u001a\t\u0005\u001f\u001d\u0003\u0015*\u0003\u0002I!\tIa)\u001e8di&|g.\r\t\u0003\u001f)K!a\u0013\t\u0003\tUs\u0017\u000e\u001e\u0005\t\u001b\u0002\u0011\t\u0011)A\u0005g\u00051\u0011/^3vK\u0002B\u0001b\u0014\u0001\u0003\u0006\u0004%\t\u0001U\u0001\bG\"\fgN\\3m+\u0005\t\u0006C\u0001*V\u001b\u0005\u0019&B\u0001+\u0005\u0003\u001dqW\r^<pe.L!AV*\u0003\u001f\tcwnY6j]\u001e\u001c\u0005.\u00198oK2D\u0001\u0002\u0017\u0001\u0003\u0002\u0003\u0006I!U\u0001\tG\"\fgN\\3mA!)!\f\u0001C\u00017\u00061A(\u001b8jiz\"b\u0001X/_?\u0002\f\u0007C\u0001\u0011\u0001\u0011\u0015)\u0012\f1\u0001\u0018\u0011\u0015i\u0012\f1\u0001 \u0011\u00151\u0013\f1\u0001)\u0011\u0015\t\u0014\f1\u00014\u0011\u0015y\u0015\f1\u0001R\u0011\u001d\u0019\u0007A1A\u0005\n\u0011\fA\u0001\\8dWV\tQ\r\u0005\u0002gS6\tqM\u0003\u0002is\u0005!A.\u00198h\u0013\tQwM\u0001\u0004PE*,7\r\u001e\u0005\u0007Y\u0002\u0001\u000b\u0011B3\u0002\u000b1|7m\u001b\u0011\t\u000f9\u0004!\u0019!C\u0005_\u0006\t2\u000f^1uK\u000eC\u0017M\\4f\u0019><w-\u001a:\u0016\u0003A\u0004\"!\u001d;\u000f\u0005\u0001\u0012\u0018BA:\u0003\u0003=Y\u0015MZ6b\u0007>tGO]8mY\u0016\u0014\u0018BA;w\u0005E\u0019F/\u0019;f\u0007\"\fgnZ3M_\u001e<WM\u001d\u0006\u0003g\nAa\u0001\u001f\u0001!\u0002\u0013\u0001\u0018AE:uCR,7\t[1oO\u0016dunZ4fe\u0002BQA\u001f\u0001\u0005Bm\fa\u0001Z8X_J\\G#A%\t\u000bu\u0004A\u0011\u0002@\u0002\u001f\r|gN\\3diR{'I]8lKJ$B!S@\u0002\u0004!1\u0011\u0011\u0001?A\u0002!\naA\u0019:pW\u0016\u0014\b\"B(}\u0001\u0004\t\u0006")
public class RequestSendThread
extends ShutdownableThread
implements ScalaObject {
    private final int controllerId;
    private final ControllerContext controllerContext;
    private final Broker toBroker;
    private final BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>> queue;
    private final BlockingChannel channel;
    private final Object lock;
    private final KafkaController.StateChangeLogger stateChangeLogger;

    public int controllerId() {
        return this.controllerId;
    }

    public ControllerContext controllerContext() {
        return this.controllerContext;
    }

    public Broker toBroker() {
        return this.toBroker;
    }

    public BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>> queue() {
        return this.queue;
    }

    public BlockingChannel channel() {
        return this.channel;
    }

    private Object lock() {
        return this.lock;
    }

    private KafkaController.StateChangeLogger stateChangeLogger() {
        return this.stateChangeLogger;
    }

    @Override
    public void doWork() {
        block7: {
            Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>> queueItem = this.queue().take();
            RequestOrResponse request$2 = (RequestOrResponse)queueItem._1();
            Function1 callback = (Function1)queueItem._2();
            ObjectRef receive$1 = new ObjectRef(null);
            try {
                Object object = this.lock();
                synchronized (object) {
                    short s;
                    block11: {
                        ObjectRef response$1;
                        block9: {
                            block10: {
                                block8: {
                                    BooleanRef isSendSuccessful$1 = new BooleanRef(false);
                                    while (this.isRunning().get() && !isSendSuccessful$1.elem) {
                                        this.liftedTree1$1(request$2, receive$1, isSendSuccessful$1);
                                    }
                                    response$1 = new ObjectRef(null);
                                    s = BoxesRunTime.unboxToShort((Object)request$2.requestId().get());
                                    if (!BoxesRunTime.equals((Object)BoxesRunTime.boxToShort((short)RequestKeys$.MODULE$.LeaderAndIsrKey()), (Object)BoxesRunTime.boxToShort((short)s))) break block8;
                                    response$1.elem = LeaderAndIsrResponse$.MODULE$.readFrom(((Receive)receive$1.elem).buffer());
                                    break block9;
                                }
                                if (!BoxesRunTime.equals((Object)BoxesRunTime.boxToShort((short)RequestKeys$.MODULE$.StopReplicaKey()), (Object)BoxesRunTime.boxToShort((short)s))) break block10;
                                response$1.elem = StopReplicaResponse$.MODULE$.readFrom(((Receive)receive$1.elem).buffer());
                                break block9;
                            }
                            if (!BoxesRunTime.equals((Object)BoxesRunTime.boxToShort((short)RequestKeys$.MODULE$.UpdateMetadataKey()), (Object)BoxesRunTime.boxToShort((short)s))) break block11;
                            response$1.elem = UpdateMetadataResponse$.MODULE$.readFrom(((Receive)receive$1.elem).buffer());
                        }
                        this.stateChangeLogger().trace((Function0<String>)new Serializable(this, response$1){
                            public static final long serialVersionUID;
                            private final RequestSendThread $outer;
                            private final ObjectRef response$1;

                            static {
                                long l = serialVersionUID = 0L;
                            }

                            public final String apply() {
                                return Predef$.MODULE$.augmentString("Controller %d epoch %d received response %s for a request sent to broker %s").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.controllerId()), BoxesRunTime.boxToInteger((int)this.$outer.controllerContext().epoch()), ((RequestOrResponse)this.response$1.elem).toString(), this.$outer.toBroker().toString()}));
                            }
                            {
                                if ($outer == null) {
                                    throw new NullPointerException();
                                }
                                this.$outer = $outer;
                                this.response$1 = objectRef;
                            }
                        });
                        Object object2 = callback == null ? BoxedUnit.UNIT : callback.apply((Object)((RequestOrResponse)response$1.elem));
                        break block7;
                    }
                    throw new MatchError((Object)BoxesRunTime.boxToShort((short)s));
                }
            }
            catch (Throwable throwable) {
                this.error((Function0<String>)new Serializable(this){
                    public static final long serialVersionUID;
                    private final RequestSendThread $outer;

                    static {
                        long l = serialVersionUID = 0L;
                    }

                    public final String apply() {
                        return Predef$.MODULE$.augmentString("Controller %d fails to send a request to broker %s").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.controllerId()), this.$outer.toBroker().toString()}));
                    }
                    {
                        if ($outer == null) {
                            throw new NullPointerException();
                        }
                        this.$outer = $outer;
                    }
                }, (Function0<Throwable>)new Serializable(this, throwable){
                    public static final long serialVersionUID;
                    private final Throwable e$3;

                    static {
                        long l = serialVersionUID = 0L;
                    }

                    public final Throwable apply() {
                        return this.e$3;
                    }
                    {
                        this.e$3 = throwable;
                    }
                });
                this.channel().disconnect();
            }
        }
    }

    private void connectToBroker(Broker broker$3, BlockingChannel channel) {
        try {
            channel.connect();
            this.info((Function0<String>)new Serializable(this, broker$3){
                public static final long serialVersionUID;
                private final RequestSendThread $outer;
                private final Broker broker$3;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final String apply() {
                    return Predef$.MODULE$.augmentString("Controller %d connected to %s for sending state change requests").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.controllerId()), this.broker$3.toString()}));
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                    this.broker$3 = broker;
                }
            });
        }
        catch (Throwable throwable) {
            channel.disconnect();
            this.error((Function0<String>)new Serializable(this, broker$3){
                public static final long serialVersionUID;
                private final RequestSendThread $outer;
                private final Broker broker$3;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final String apply() {
                    return Predef$.MODULE$.augmentString("Controller %d's connection to broker %s was unsuccessful").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.controllerId()), this.broker$3.toString()}));
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                    this.broker$3 = broker;
                }
            }, (Function0<Throwable>)new Serializable(this, throwable){
                public static final long serialVersionUID;
                private final Throwable e$4;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final Throwable apply() {
                    return this.e$4;
                }
                {
                    this.e$4 = throwable;
                }
            });
        }
    }

    private final void liftedTree1$1(RequestOrResponse requestOrResponse, ObjectRef objectRef, BooleanRef booleanRef) {
        try {
            this.channel().send(requestOrResponse);
            objectRef.elem = this.channel().receive();
            booleanRef.elem = true;
        }
        catch (Throwable throwable) {
            this.warn((Function0<String>)new Serializable(this, requestOrResponse){
                public static final long serialVersionUID;
                private final RequestSendThread $outer;
                private final RequestOrResponse request$2;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final String apply() {
                    return Predef$.MODULE$.augmentString("Controller %d epoch %d fails to send request %s to broker %s. Reconnecting to broker.").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.controllerId()), BoxesRunTime.boxToInteger((int)this.$outer.controllerContext().epoch()), this.request$2.toString(), this.$outer.toBroker().toString()}));
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                    this.request$2 = requestOrResponse;
                }
            }, (Function0<Throwable>)new Serializable(this, throwable){
                public static final long serialVersionUID;
                private final Throwable e$2;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final Throwable apply() {
                    return this.e$2;
                }
                {
                    this.e$2 = throwable;
                }
            });
            this.channel().disconnect();
            this.connectToBroker(this.toBroker(), this.channel());
            booleanRef.elem = false;
            Utils$.MODULE$.swallow((Function0<BoxedUnit>)new Serializable(this){
                public static final long serialVersionUID;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final void apply() {
                    this.apply$mcV$sp();
                }

                public void apply$mcV$sp() {
                    Thread.sleep(300L);
                }
            });
        }
    }

    public RequestSendThread(int controllerId, ControllerContext controllerContext, Broker toBroker, BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>> queue, BlockingChannel channel) {
        this.controllerId = controllerId;
        this.controllerContext = controllerContext;
        this.toBroker = toBroker;
        this.queue = queue;
        this.channel = channel;
        super(Predef$.MODULE$.augmentString("Controller-%d-to-broker-%d-send-thread").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)controllerId), BoxesRunTime.boxToInteger((int)toBroker.id())})), ShutdownableThread$.MODULE$.init$default$2());
        this.lock = new Object();
        this.stateChangeLogger = KafkaController$.MODULE$.stateChangeLogger();
        this.connectToBroker(toBroker, channel);
    }
}

