/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import kafka.common.InterBrokerSendThread;
import kafka.common.RequestAndCompletionHandler;
import kafka.server.BrokerToControllerQueueItem;
import kafka.server.ControllerInformation;
import kafka.server.ControllerNodeProvider;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

@ScalaSignature(bytes="\u0006\u0001\u0005Uf\u0001B\u000f\u001f\u0001\rB\u0001B\u000b\u0001\u0003\u0002\u0003\u0006Ia\u000b\u0005\tm\u0001\u0011\t\u0019!C\u0001o!Aa\b\u0001BA\u0002\u0013\u0005q\b\u0003\u0005F\u0001\t\u0005\t\u0015)\u00039\u0011!1\u0005A!A!\u0002\u00139\u0005\u0002\u0003(\u0001\u0005\u0003\u0005\u000b\u0011B(\t\u0011I\u0003!\u0011!Q\u0001\nMC\u0001B\u0016\u0001\u0003\u0002\u0003\u0006Ia\u0016\u0005\t5\u0002\u0011\t\u0011)A\u00057\"I!\r\u0001B\u0001B\u0003%1M\u001c\u0005\tg\u0002\u0011\t\u0011)A\u0005i\")q\u000f\u0001C\u0001q\"9\u0011q\u0001\u0001\u0005\n\u0005%\u0001\"CA\b\u0001\t\u0007I\u0011BA\t\u0011!\ti\u0003\u0001Q\u0001\n\u0005M\u0001\"CA\u0018\u0001\t\u0007I\u0011BA\u0019\u0011!\t9\u0005\u0001Q\u0001\n\u0005M\u0002\"CA%\u0001\u0001\u0007I\u0011\u0001\u00108\u0011)\tY\u0005\u0001a\u0001\n\u0003q\u0012Q\n\u0005\b\u0003#\u0002\u0001\u0015)\u00039\u0011\u001d\tY\u0006\u0001C\u0001\u0003;Bq!!\u001a\u0001\t\u0013\t9\u0007C\u0004\u0002n\u0001!\t!a\u001c\t\u000f\u0005U\u0004\u0001\"\u0001\u0002x!9\u0011q\u0010\u0001\u0005B\u0005\u0005\u0005\u0002CAN\u0001\u0011\u0005a$!(\t\u000f\u0005=\u0006\u0001\"\u0011\u00022\"9\u00111\u0017\u0001\u0005B\u0005E&a\b\"s_.,'\u000fV8D_:$(o\u001c7mKJ\u0014V-];fgR$\u0006N]3bI*\u0011q\u0004I\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003\u0005\nQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001IA\u0011Q\u0005K\u0007\u0002M)\u0011q\u0005I\u0001\u0007G>lWn\u001c8\n\u0005%2#!F%oi\u0016\u0014(I]8lKJ\u001cVM\u001c3UQJ,\u0017\rZ\u0001\u0015S:LG/[1m\u001d\u0016$xo\u001c:l\u00072LWM\u001c;\u0011\u00051\"T\"A\u0017\u000b\u00059z\u0013aB2mS\u0016tGo\u001d\u0006\u0003CAR!!\r\u001a\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0019\u0014aA8sO&\u0011Q'\f\u0002\f\u0017\u000647.Y\"mS\u0016tG/\u0001\u0010jg:+Go^8sW\u000ec\u0017.\u001a8u\r>\u0014(l[\"p]R\u0014x\u000e\u001c7feV\t\u0001\b\u0005\u0002:y5\t!HC\u0001<\u0003\u0015\u00198-\u00197b\u0013\ti$HA\u0004C_>dW-\u00198\u0002E%\u001ch*\u001a;x_J\\7\t\\5f]R4uN\u001d.l\u0007>tGO]8mY\u0016\u0014x\fJ3r)\t\u00015\t\u0005\u0002:\u0003&\u0011!I\u000f\u0002\u0005+:LG\u000fC\u0004E\u0007\u0005\u0005\t\u0019\u0001\u001d\u0002\u0007a$\u0013'A\u0010jg:+Go^8sW\u000ec\u0017.\u001a8u\r>\u0014(l[\"p]R\u0014x\u000e\u001c7fe\u0002\nAC\\3uo>\u00148n\u00117jK:$h)Y2u_JL\b\u0003B\u001dI\u0015.J!!\u0013\u001e\u0003\u0013\u0019+hn\u0019;j_:\f\u0004CA&M\u001b\u0005q\u0012BA'\u001f\u0005U\u0019uN\u001c;s_2dWM]%oM>\u0014X.\u0019;j_:\fq\"\\3uC\u0012\fG/Y+qI\u0006$XM\u001d\t\u0003YAK!!U\u0017\u0003+5\u000bg.^1m\u001b\u0016$\u0018\rZ1uCV\u0003H-\u0019;fe\u000612m\u001c8ue>dG.\u001a:O_\u0012,\u0007K]8wS\u0012,'\u000f\u0005\u0002L)&\u0011QK\b\u0002\u0017\u0007>tGO]8mY\u0016\u0014hj\u001c3f!J|g/\u001b3fe\u000611m\u001c8gS\u001e\u0004\"a\u0013-\n\u0005es\"aC&bM.\f7i\u001c8gS\u001e\fA\u0001^5nKB\u0011A\fY\u0007\u0002;*\u0011alX\u0001\u0006kRLGn\u001d\u0006\u0003O=J!!Y/\u0003\tQKW.Z\u0001\u000bi\"\u0014X-\u00193OC6,\u0007C\u00013l\u001d\t)\u0017\u000e\u0005\u0002gu5\tqM\u0003\u0002iE\u00051AH]8pizJ!A\u001b\u001e\u0002\rA\u0013X\rZ3g\u0013\taWN\u0001\u0004TiJLgn\u001a\u0006\u0003UjJ!a\u001c9\u0002\t9\fW.Z\u0005\u0003cJ\u0014!c\u00155vi\u0012|wO\\1cY\u0016$\u0006N]3bI*\u0011a\fI\u0001\u000fe\u0016$(/\u001f+j[\u0016|W\u000f^'t!\tIT/\u0003\u0002wu\t!Aj\u001c8h\u0003\u0019a\u0014N\\5u}Qi\u0011P_>}{z|\u0018\u0011AA\u0002\u0003\u000b\u0001\"a\u0013\u0001\t\u000b)b\u0001\u0019A\u0016\t\u000bYb\u0001\u0019\u0001\u001d\t\u000b\u0019c\u0001\u0019A$\t\u000b9c\u0001\u0019A(\t\u000bIc\u0001\u0019A*\t\u000bYc\u0001\u0019A,\t\u000bic\u0001\u0019A.\t\u000b\td\u0001\u0019A2\t\u000bMd\u0001\u0019\u0001;\u0002/5\f\u0017PY3SKN,GOT3uo>\u00148n\u00117jK:$Hc\u0001!\u0002\f!1\u0011QB\u0007A\u0002)\u000bQcY8oiJ|G\u000e\\3s\u0013:4wN]7bi&|g.\u0001\u0007sKF,Xm\u001d;Rk\u0016,X-\u0006\u0002\u0002\u0014A1\u0011QCA\u0012\u0003Oi!!a\u0006\u000b\t\u0005e\u00111D\u0001\u000bG>t7-\u001e:sK:$(\u0002BA\u000f\u0003?\tA!\u001e;jY*\u0011\u0011\u0011E\u0001\u0005U\u00064\u0018-\u0003\u0003\u0002&\u0005]!a\u0005'j].,GM\u00117pG.Lgn\u001a#fcV,\u0007cA&\u0002*%\u0019\u00111\u0006\u0010\u00037\t\u0013xn[3s)>\u001cuN\u001c;s_2dWM])vKV,\u0017\n^3n\u00035\u0011X-];fgR\fV/Z;fA\u0005\u0001\u0012m\u0019;jm\u0016\u001cuN\u001c;s_2dWM]\u000b\u0003\u0003g\u0001b!!\u000e\u0002<\u0005}RBAA\u001c\u0015\u0011\tI$a\u0006\u0002\r\u0005$x.\\5d\u0013\u0011\ti$a\u000e\u0003\u001f\u0005#x.\\5d%\u00164WM]3oG\u0016\u0004B!!\u0011\u0002D5\tq,C\u0002\u0002F}\u0013AAT8eK\u0006\t\u0012m\u0019;jm\u0016\u001cuN\u001c;s_2dWM\u001d\u0011\u0002\u000fM$\u0018M\u001d;fI\u0006Y1\u000f^1si\u0016$w\fJ3r)\r\u0001\u0015q\n\u0005\b\tN\t\t\u00111\u00019\u0003!\u0019H/\u0019:uK\u0012\u0004\u0003f\u0001\u000b\u0002VA\u0019\u0011(a\u0016\n\u0007\u0005e#H\u0001\u0005w_2\fG/\u001b7f\u0003]\t7\r^5wK\u000e{g\u000e\u001e:pY2,'/\u00113ee\u0016\u001c8\u000f\u0006\u0002\u0002`A)\u0011(!\u0019\u0002@%\u0019\u00111\r\u001e\u0003\r=\u0003H/[8o\u0003])\b\u000fZ1uK\u000e{g\u000e\u001e:pY2,'/\u00113ee\u0016\u001c8\u000fF\u0002A\u0003SBq!a\u001b\u0017\u0001\u0004\ty$A\noK^\f5\r^5wK\u000e{g\u000e\u001e:pY2,'/A\u0004f]F,X-^3\u0015\u0007\u0001\u000b\t\bC\u0004\u0002t]\u0001\r!a\n\u0002\u000fI,\u0017/^3ti\u0006I\u0011/^3vKNK'0Z\u000b\u0003\u0003s\u00022!OA>\u0013\r\tiH\u000f\u0002\u0004\u0013:$\u0018\u0001E4f]\u0016\u0014\u0018\r^3SKF,Xm\u001d;t)\t\t\u0019\t\u0005\u0004\u0002\u0006\u0006=\u0015Q\u0013\b\u0005\u0003\u000f\u000bYID\u0002g\u0003\u0013K\u0011aO\u0005\u0004\u0003\u001bS\u0014a\u00029bG.\fw-Z\u0005\u0005\u0003#\u000b\u0019J\u0001\u0005Ji\u0016\u0014\u0018M\u00197f\u0015\r\tiI\u000f\t\u0004K\u0005]\u0015bAAMM\tY\"+Z9vKN$\u0018I\u001c3D_6\u0004H.\u001a;j_:D\u0015M\u001c3mKJ\fa\u0002[1oI2,'+Z:q_:\u001cX\r\u0006\u0003\u0002 \u0006-Fc\u0001!\u0002\"\"9\u00111\u0015\u000eA\u0002\u0005\u0015\u0016\u0001\u0003:fgB|gn]3\u0011\u00071\n9+C\u0002\u0002*6\u0012ab\u00117jK:$(+Z:q_:\u001cX\rC\u0004\u0002.j\u0001\r!a\n\u0002\u0013E,X-^3Ji\u0016l\u0017A\u00023p/>\u00148\u000eF\u0001A\u0003\u0015\u0019H/\u0019:u\u0001")
public class BrokerToControllerRequestThread
extends InterBrokerSendThread {
    private boolean isNetworkClientForZkController;
    private final Function1<ControllerInformation, KafkaClient> networkClientFactory;
    private final ManualMetadataUpdater metadataUpdater;
    private final ControllerNodeProvider controllerNodeProvider;
    private final Time time;
    private final long retryTimeoutMs;
    private final LinkedBlockingDeque<BrokerToControllerQueueItem> requestQueue;
    private final AtomicReference<Node> activeController;
    private volatile boolean started;

    public boolean isNetworkClientForZkController() {
        return this.isNetworkClientForZkController;
    }

    public void isNetworkClientForZkController_$eq(boolean x$1) {
        this.isNetworkClientForZkController = x$1;
    }

    private void maybeResetNetworkClient(ControllerInformation controllerInformation) {
        if (this.isNetworkClientForZkController() != controllerInformation.isZkController()) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(88).append("Controller changed to ").append((Object)(this.isNetworkClientForZkController() ? "kraft" : "zk")).append(" mode. ").append("Resetting network client with new controller information : ").append(controllerInformation).toString());
            KafkaClient oldClient = this.networkClient();
            oldClient.initiateClose();
            oldClient.close();
            this.isNetworkClientForZkController_$eq(controllerInformation.isZkController());
            this.updateControllerAddress((Node)controllerInformation.node().orNull(Predef$.MODULE$.$conforms()));
            controllerInformation.node().foreach((Function1 & Serializable & scala.Serializable)n -> {
                this.metadataUpdater.setNodes((java.util.List)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)n, (List)Nil$.MODULE$)).asJava());
                return BoxedUnit.UNIT;
            });
            this.networkClient_$eq((KafkaClient)this.networkClientFactory.apply((Object)controllerInformation));
        }
    }

    private LinkedBlockingDeque<BrokerToControllerQueueItem> requestQueue() {
        return this.requestQueue;
    }

    private AtomicReference<Node> activeController() {
        return this.activeController;
    }

    public boolean started() {
        return this.started;
    }

    public void started_$eq(boolean x$1) {
        this.started = x$1;
    }

    public Option<Node> activeControllerAddress() {
        return Option$.MODULE$.apply((Object)this.activeController().get());
    }

    private void updateControllerAddress(Node newActiveController) {
        this.activeController().set(newActiveController);
    }

    public void enqueue(BrokerToControllerQueueItem request) {
        if (!this.started()) {
            throw new IllegalStateException("Cannot enqueue a request if the request thread is not running");
        }
        this.requestQueue().add(request);
        if (this.activeControllerAddress().isDefined()) {
            this.wakeup();
        }
    }

    public int queueSize() {
        return this.requestQueue().size();
    }

    @Override
    public Iterable<RequestAndCompletionHandler> generateRequests() {
        long currentTimeMs = this.time.milliseconds();
        Iterator<BrokerToControllerQueueItem> requestIter = this.requestQueue().iterator();
        while (requestIter.hasNext()) {
            BrokerToControllerQueueItem request = requestIter.next();
            if (currentTimeMs - request.createdTimeMs() >= this.retryTimeoutMs) {
                requestIter.remove();
                request.callback().onTimeout();
                continue;
            }
            Option<Node> controllerAddress = this.activeControllerAddress();
            if (!controllerAddress.isDefined()) continue;
            requestIter.remove();
            return Option$.MODULE$.option2Iterable((Option)new Some((Object)new RequestAndCompletionHandler(this.time.milliseconds(), (Node)controllerAddress.get(), request.request(), response -> this.handleResponse(request, response))));
        }
        return Option$.MODULE$.option2Iterable((Option)None$.MODULE$);
    }

    public void handleResponse(BrokerToControllerQueueItem queueItem, ClientResponse response) {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("Request ").append(queueItem.request()).append(" received ").append(response).toString());
        if (response.authenticationException() != null) {
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(59).append("Request ").append(queueItem.request()).append(" failed due to authentication error with controller").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> response.authenticationException());
            queueItem.callback().onComplete(response);
            return;
        }
        if (response.versionMismatch() != null) {
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(48).append("Request ").append(queueItem.request()).append(" failed due to unsupported version error").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> response.versionMismatch());
            queueItem.callback().onComplete(response);
            return;
        }
        if (response.wasDisconnected()) {
            this.updateControllerAddress(null);
            this.requestQueue().putFirst(queueItem);
            return;
        }
        if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(97).append("Request ").append(queueItem.request()).append(" received NOT_CONTROLLER exception. Disconnecting the ").append("connection to the stale controller ").append(this.activeControllerAddress().map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.idString()).getOrElse((Function0 & Serializable & scala.Serializable)() -> "null")).toString());
            this.activeControllerAddress().foreach((Function1 & Serializable & scala.Serializable)controllerAddress -> {
                BrokerToControllerRequestThread.$anonfun$handleResponse$9(this, controllerAddress);
                return BoxedUnit.UNIT;
            });
            this.requestQueue().putFirst(queueItem);
            return;
        }
        queueItem.callback().onComplete(response);
    }

    @Override
    public void doWork() {
        ControllerInformation controllerInformation = this.controllerNodeProvider.getControllerInfo();
        this.maybeResetNetworkClient(controllerInformation);
        if (this.activeControllerAddress().isDefined()) {
            super.pollOnce(Long.MAX_VALUE);
            return;
        }
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Controller isn't cached, looking for local metadata changes");
        Option<Node> option = controllerInformation.node();
        if (option instanceof Some) {
            Node controllerNode = (Node)((Some)option).value();
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(51).append("Recorded new controller, from now on will use node ").append(controllerNode).toString());
            this.updateControllerAddress(controllerNode);
            this.metadataUpdater.setNodes((java.util.List)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)controllerNode, (List)Nil$.MODULE$)).asJava());
            return;
        }
        if (None$.MODULE$.equals(option)) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "No controller provided, retrying after backoff");
            super.pollOnce(100L);
            return;
        }
        throw new MatchError(option);
    }

    @Override
    public void start() {
        super.start();
        this.started_$eq(true);
    }

    public static final /* synthetic */ void $anonfun$handleResponse$9(BrokerToControllerRequestThread $this, Node controllerAddress) {
        try {
            $this.networkClient().disconnect(controllerAddress.idString());
        }
        catch (Throwable t) {
            $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Had an error while disconnecting from NetworkClient.", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> t);
        }
        $this.updateControllerAddress(null);
    }

    public BrokerToControllerRequestThread(KafkaClient initialNetworkClient, boolean isNetworkClientForZkController, Function1<ControllerInformation, KafkaClient> networkClientFactory, ManualMetadataUpdater metadataUpdater, ControllerNodeProvider controllerNodeProvider, KafkaConfig config, Time time, String threadName, long retryTimeoutMs) {
        this.isNetworkClientForZkController = isNetworkClientForZkController;
        this.networkClientFactory = networkClientFactory;
        this.metadataUpdater = metadataUpdater;
        this.controllerNodeProvider = controllerNodeProvider;
        this.time = time;
        this.retryTimeoutMs = retryTimeoutMs;
        super(threadName, initialNetworkClient, (int)Math.min(Integer.MAX_VALUE, Math.min((long)config.controllerSocketTimeoutMs(), retryTimeoutMs)), time, false);
        this.requestQueue = new LinkedBlockingDeque();
        this.activeController = new AtomicReference<Object>(null);
        this.started = false;
    }
}

