/*
 * Decompiled with CFR 0.152.
 */
package kafka.controller;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import kafka.cluster.Broker;
import kafka.controller.ControllerBrokerStateInfo;
import kafka.controller.ControllerChannelManager$;
import kafka.controller.ControllerContext;
import kafka.controller.QueueItem;
import kafka.controller.RequestSendThread;
import kafka.controller.StateChangeLogger;
import kafka.metrics.KafkaMetricsGroup;
import kafka.server.KafkaConfig;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.HashMap;
import scala.math.Numeric;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;

/*
 * Duplicate member names - consider using --renamedupmembers true
 */
@ScalaSignature(bytes="\u0006\u0001\u0005-x!B\u0001\u0003\u0011\u00039\u0011\u0001G\"p]R\u0014x\u000e\u001c7fe\u000eC\u0017M\u001c8fY6\u000bg.Y4fe*\u00111\u0001B\u0001\u000bG>tGO]8mY\u0016\u0014(\"A\u0003\u0002\u000b-\fgm[1\u0004\u0001A\u0011\u0001\"C\u0007\u0002\u0005\u0019)!B\u0001E\u0001\u0017\tA2i\u001c8ue>dG.\u001a:DQ\u0006tg.\u001a7NC:\fw-\u001a:\u0014\u0005%a\u0001CA\u0007\u0011\u001b\u0005q!\"A\b\u0002\u000bM\u001c\u0017\r\\1\n\u0005Eq!AB!osJ+g\rC\u0003\u0014\u0013\u0011\u0005A#\u0001\u0004=S:LGO\u0010\u000b\u0002\u000f!9a#\u0003b\u0001\n\u00039\u0012aE)vKV,7+\u001b>f\u001b\u0016$(/[2OC6,W#\u0001\r\u0011\u0005eqR\"\u0001\u000e\u000b\u0005ma\u0012\u0001\u00027b]\u001eT\u0011!H\u0001\u0005U\u00064\u0018-\u0003\u0002 5\t11\u000b\u001e:j]\u001eDa!I\u0005!\u0002\u0013A\u0012\u0001F)vKV,7+\u001b>f\u001b\u0016$(/[2OC6,\u0007\u0005C\u0004$\u0013E\u0005I\u0011\u0001\u0013\u00027\u0011bWm]:j]&$He\u001a:fCR,'\u000f\n3fM\u0006,H\u000e\u001e\u00137+\u0005)#F\u0001\u00144!\riq%K\u0005\u0003Q9\u0011aa\u00149uS>t\u0007C\u0001\u00162\u001d\tYs\u0006\u0005\u0002-\u001d5\tQF\u0003\u0002/\r\u00051AH]8pizJ!\u0001\r\b\u0002\rA\u0013X\rZ3g\u0013\ty\"G\u0003\u00021\u001d-\nA\u0007\u0005\u00026u5\taG\u0003\u00028q\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0003s9\t!\"\u00198o_R\fG/[8o\u0013\tYdGA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u00164AA\u0003\u0002\u0001{M!A\b\u0004 E!\ty$)D\u0001A\u0015\t\tE!A\u0003vi&d7/\u0003\u0002D\u0001\n9Aj\\4hS:<\u0007CA#I\u001b\u00051%BA$\u0005\u0003\u001diW\r\u001e:jGNL!!\u0013$\u0003#-\u000bgm[1NKR\u0014\u0018nY:He>,\b\u000f\u0003\u0005Ly\t\u0005\t\u0015!\u0003M\u0003E\u0019wN\u001c;s_2dWM]\"p]R,\u0007\u0010\u001e\t\u0003\u00115K!A\u0014\u0002\u0003#\r{g\u000e\u001e:pY2,'oQ8oi\u0016DH\u000f\u0003\u0005Qy\t\u0005\t\u0015!\u0003R\u0003\u0019\u0019wN\u001c4jOB\u0011!+V\u0007\u0002'*\u0011A\u000bB\u0001\u0007g\u0016\u0014h/\u001a:\n\u0005Y\u001b&aC&bM.\f7i\u001c8gS\u001eD\u0001\u0002\u0017\u001f\u0003\u0002\u0003\u0006I!W\u0001\u0005i&lW\r\u0005\u0002[G6\t1L\u0003\u0002B9*\u0011QLX\u0001\u0007G>lWn\u001c8\u000b\u0005\u0015y&B\u00011b\u0003\u0019\t\u0007/Y2iK*\t!-A\u0002pe\u001eL!\u0001Z.\u0003\tQKW.\u001a\u0005\t\u000fr\u0012\t\u0011)A\u0005MB\u0011q-[\u0007\u0002Q*\u0011q\tX\u0005\u0003U\"\u0014q!T3ue&\u001c7\u000f\u0003\u0005my\t\u0005\t\u0015!\u0003n\u0003E\u0019H/\u0019;f\u0007\"\fgnZ3M_\u001e<WM\u001d\t\u0003\u00119L!a\u001c\u0002\u0003#M#\u0018\r^3DQ\u0006tw-\u001a'pO\u001e,'\u000f\u0003\u0005ry\t\u0005\t\u0015!\u0003'\u0003A!\bN]3bI:\u000bW.\u001a)sK\u001aL\u0007\u0010C\u0003\u0014y\u0011\u00051\u000fF\u0004ukZ<\b0\u001f>\u0011\u0005!a\u0004\"B&s\u0001\u0004a\u0005\"\u0002)s\u0001\u0004\t\u0006\"\u0002-s\u0001\u0004I\u0006\"B$s\u0001\u00041\u0007\"\u00027s\u0001\u0004i\u0007bB9s!\u0003\u0005\rA\n\u0005\byr\u0012\r\u0011\"\u0005~\u0003=\u0011'o\\6feN#\u0018\r^3J]\u001a|W#\u0001@\u0011\u000f}\fI!!\u0004\u0002\u00145\u0011\u0011\u0011\u0001\u0006\u0005\u0003\u0007\t)!A\u0004nkR\f'\r\\3\u000b\u0007\u0005\u001da\"\u0001\u0006d_2dWm\u0019;j_:LA!a\u0003\u0002\u0002\t9\u0001*Y:i\u001b\u0006\u0004\bcA\u0007\u0002\u0010%\u0019\u0011\u0011\u0003\b\u0003\u0007%sG\u000fE\u0002\t\u0003+I1!a\u0006\u0003\u0005e\u0019uN\u001c;s_2dWM\u001d\"s_.,'o\u0015;bi\u0016LeNZ8\t\u000f\u0005mA\b)A\u0005}\u0006\u0001\"M]8lKJ\u001cF/\u0019;f\u0013:4w\u000e\t\u0005\n\u0003?a$\u0019!C\u0005\u0003C\t!B\u0019:pW\u0016\u0014Hj\\2l+\t\t\u0019\u0003E\u0002\u001a\u0003KI1!a\n\u001b\u0005\u0019y%M[3di\"A\u00111\u0006\u001f!\u0002\u0013\t\u0019#A\u0006ce>\\WM\u001d'pG.\u0004\u0003bBA\u0018y\u0011\u0005\u0011\u0011G\u0001\bgR\f'\u000f^;q)\t\t\u0019\u0004E\u0002\u000e\u0003kI1!a\u000e\u000f\u0005\u0011)f.\u001b;\t\u000f\u0005mB\b\"\u0001\u00022\u0005A1\u000f[;uI><h\u000eC\u0004\u0002@q\"\t!!\u0011\u0002\u0017M,g\u000e\u001a*fcV,7\u000f\u001e\u000b\u000b\u0003g\t\u0019%a\u0012\u0002X\u0005\u001d\u0005\u0002CA#\u0003{\u0001\r!!\u0004\u0002\u0011\t\u0014xn[3s\u0013\u0012D\u0001\"!\u0013\u0002>\u0001\u0007\u00111J\u0001\u0007CBL7*Z=\u0011\t\u00055\u00131K\u0007\u0003\u0003\u001fR1!!\u0015]\u0003!\u0001(o\u001c;pG>d\u0017\u0002BA+\u0003\u001f\u0012q!\u00119j\u0017\u0016L8\u000f\u0003\u0005\u0002Z\u0005u\u0002\u0019AA.\u0003\u001d\u0011X-];fgR\u0004D!!\u0018\u0002vA1\u0011qLA6\u0003crA!!\u0019\u0002h5\u0011\u00111\r\u0006\u0004\u0003Kb\u0016\u0001\u0003:fcV,7\u000f^:\n\t\u0005%\u00141M\u0001\u0010\u0003\n\u001cHO]1diJ+\u0017/^3ti&!\u0011QNA8\u0005\u001d\u0011U/\u001b7eKJTA!!\u001b\u0002dA!\u00111OA;\u0019\u0001!A\"a\u001e\u0002X\u0005\u0005\t\u0011!B\u0001\u0003s\u00121a\u0018\u00132#\u0011\tY(!!\u0011\u00075\ti(C\u0002\u0002\u00009\u0011qAT8uQ&tw\r\u0005\u0003\u0002b\u0005\r\u0015\u0002BAC\u0003G\u0012q\"\u00112tiJ\f7\r\u001e*fcV,7\u000f\u001e\u0005\u000b\u0003\u0013\u000bi\u0004%AA\u0002\u0005-\u0015\u0001C2bY2\u0014\u0017mY6\u0011\u000f5\ti)!%\u00024%\u0019\u0011q\u0012\b\u0003\u0013\u0019+hn\u0019;j_:\f\u0004\u0003BA1\u0003'KA!!&\u0002d\t\u0001\u0012IY:ue\u0006\u001cGOU3ta>t7/\u001a\u0005\b\u00033cD\u0011AAN\u0003%\tG\r\u001a\"s_.,'\u000f\u0006\u0003\u00024\u0005u\u0005\u0002CAP\u0003/\u0003\r!!)\u0002\r\t\u0014xn[3s!\u0011\t\u0019+!+\u000e\u0005\u0005\u0015&bAAT\t\u000591\r\\;ti\u0016\u0014\u0018\u0002BAV\u0003K\u0013aA\u0011:pW\u0016\u0014\bbBAXy\u0011\u0005\u0011\u0011W\u0001\re\u0016lwN^3Ce>\\WM\u001d\u000b\u0005\u0003g\t\u0019\f\u0003\u0005\u0002F\u00055\u0006\u0019AA\u0007\u0011\u001d\t9\f\u0010C\u0005\u0003s\u000bA\"\u00193e\u001d\u0016<(I]8lKJ$B!a\r\u0002<\"A\u0011qTA[\u0001\u0004\t\t\u000bC\u0004\u0002@r\"I!!1\u0002\u001bE,X-^3TSj,G+Y4t)\u0011\t\u0019-a4\u0011\r\u0005\u0015\u00171\u001a\r\u0019\u001b\t\t9M\u0003\u0003\u0002J\u0006\u0015\u0011!C5n[V$\u0018M\u00197f\u0013\u0011\ti-a2\u0003\u00075\u000b\u0007\u000f\u0003\u0005\u0002F\u0005u\u0006\u0019AA\u0007\u0011\u001d\t\u0019\u000e\u0010C\u0005\u0003+\fAC]3n_Z,W\t_5ti&twM\u0011:pW\u0016\u0014H\u0003BA\u001a\u0003/D\u0001\"!7\u0002R\u0002\u0007\u00111C\u0001\fEJ|7.\u001a:Ti\u0006$X\rC\u0004\u0002^r\"\t\"a8\u0002-M$\u0018M\u001d;SKF,Xm\u001d;TK:$G\u000b\u001b:fC\u0012$B!a\r\u0002b\"A\u0011QIAn\u0001\u0004\ti\u0001C\u0005\u0002fr\n\n\u0011\"\u0001\u0002h\u0006)2/\u001a8e%\u0016\fX/Z:uI\u0011,g-Y;mi\u0012\"TCAAuU\r\tYi\r")
public class ControllerChannelManager
implements KafkaMetricsGroup {
    private final ControllerContext controllerContext;
    private final KafkaConfig config;
    private final Time time;
    private final Metrics metrics;
    private final StateChangeLogger stateChangeLogger;
    private final Option<String> threadNamePrefix;
    private final HashMap<Object, ControllerBrokerStateInfo> brokerStateInfo;
    private final Object kafka$controller$ControllerChannelManager$$brokerLock;
    private final String loggerName;
    private Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    private volatile boolean bitmap$0;

    public static Option<String> $lessinit$greater$default$6() {
        return ControllerChannelManager$.MODULE$.$lessinit$greater$default$6();
    }

    public static String QueueSizeMetricName() {
        return ControllerChannelManager$.MODULE$.QueueSizeMetricName();
    }

    @Override
    public MetricName metricName(String name, Map<String, String> tags) {
        return KafkaMetricsGroup.metricName$(this, name, tags);
    }

    @Override
    public MetricName explicitMetricName(String group, String typeName, String name, Map<String, String> tags) {
        return KafkaMetricsGroup.explicitMetricName$(this, group, typeName, name, tags);
    }

    @Override
    public <T> Gauge<T> newGauge(String name, Gauge<T> metric, Map<String, String> tags) {
        return KafkaMetricsGroup.newGauge$(this, name, metric, tags);
    }

    @Override
    public Meter newMeter(String name, String eventType, TimeUnit timeUnit, Map<String, String> tags) {
        return KafkaMetricsGroup.newMeter$(this, name, eventType, timeUnit, tags);
    }

    @Override
    public Histogram newHistogram(String name, boolean biased, Map<String, String> tags) {
        return KafkaMetricsGroup.newHistogram$(this, name, biased, tags);
    }

    @Override
    public Timer newTimer(String name, TimeUnit durationUnit, TimeUnit rateUnit, Map<String, String> tags) {
        return KafkaMetricsGroup.newTimer$(this, name, durationUnit, rateUnit, tags);
    }

    @Override
    public void removeMetric(String name, Map<String, String> tags) {
        KafkaMetricsGroup.removeMetric$(this, name, tags);
    }

    @Override
    public <T> Map<String, String> newGauge$default$3() {
        return KafkaMetricsGroup.newGauge$default$3$(this);
    }

    @Override
    public Map<String, String> newMeter$default$4() {
        return KafkaMetricsGroup.newMeter$default$4$(this);
    }

    @Override
    public Map<String, String> removeMetric$default$2() {
        return KafkaMetricsGroup.removeMetric$default$2$(this);
    }

    @Override
    public Map<String, String> newTimer$default$4() {
        return KafkaMetricsGroup.newTimer$default$4$(this);
    }

    @Override
    public boolean newHistogram$default$2() {
        return KafkaMetricsGroup.newHistogram$default$2$(this);
    }

    @Override
    public Map<String, String> newHistogram$default$3() {
        return KafkaMetricsGroup.newHistogram$default$3$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public Object trace(Function0<Throwable> e) {
        return Logging.trace$(this, e);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public void swallowTrace(Function0<BoxedUnit> action) {
        Logging.swallowTrace$(this, action);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public Object debug(Function0<Throwable> e) {
        return Logging.debug$(this, e);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void swallowDebug(Function0<BoxedUnit> action) {
        Logging.swallowDebug$(this, action);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public Object info(Function0<Throwable> e) {
        return Logging.info$(this, e);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void swallowInfo(Function0<BoxedUnit> action) {
        Logging.swallowInfo$(this, action);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public Object warn(Function0<Throwable> e) {
        return Logging.warn$(this, e);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void swallowWarn(Function0<BoxedUnit> action) {
        Logging.swallowWarn$(this, action);
    }

    @Override
    public void swallow(Function0<BoxedUnit> action) {
        Logging.swallow$(this, action);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public Object error(Function0<Throwable> e) {
        return Logging.error$(this, e);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void swallowError(Function0<BoxedUnit> action) {
        Logging.swallowError$(this, action);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public Object fatal(Function0<Throwable> e) {
        return Logging.fatal$(this, e);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    @Override
    public String loggerName() {
        return this.loggerName;
    }

    private Logger logger$lzycompute() {
        ControllerChannelManager controllerChannelManager = this;
        synchronized (controllerChannelManager) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        return !this.bitmap$0 ? this.logger$lzycompute() : this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    @Override
    public Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    @Override
    public void kafka$utils$Logging$_setter_$loggerName_$eq(String x$1) {
        this.loggerName = x$1;
    }

    @Override
    public final void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ x$1) {
        this.kafka$utils$Logging$$log4jController = x$1;
    }

    public HashMap<Object, ControllerBrokerStateInfo> brokerStateInfo() {
        return this.brokerStateInfo;
    }

    public Object kafka$controller$ControllerChannelManager$$brokerLock() {
        return this.kafka$controller$ControllerChannelManager$$brokerLock;
    }

    public void startup() {
        Object object = this.kafka$controller$ControllerChannelManager$$brokerLock();
        synchronized (object) {
            this.brokerStateInfo().foreach((Function1 & Serializable & scala.Serializable)brokerState -> {
                this.startRequestSendThread(brokerState._1$mcI$sp());
                return BoxedUnit.UNIT;
            });
        }
    }

    public void shutdown() {
        Object object = this.kafka$controller$ControllerChannelManager$$brokerLock();
        synchronized (object) {
            this.brokerStateInfo().values().foreach((Function1 & Serializable & scala.Serializable)brokerState -> {
                this.removeExistingBroker(brokerState);
                return BoxedUnit.UNIT;
            });
        }
    }

    public void sendRequest(int brokerId, ApiKeys apiKey, AbstractRequest.Builder<? extends AbstractRequest> request, Function1<AbstractResponse, BoxedUnit> callback) {
        Object object = this.kafka$controller$ControllerChannelManager$$brokerLock();
        synchronized (object) {
            Option stateInfoOpt;
            Option option = stateInfoOpt = this.brokerStateInfo().get((Object)BoxesRunTime.boxToInteger((int)brokerId));
            if (option instanceof Some) {
                Some some = (Some)option;
                ControllerBrokerStateInfo stateInfo = (ControllerBrokerStateInfo)some.value();
                stateInfo.messageQueue().put(new QueueItem(apiKey, request, callback));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else if (None$.MODULE$.equals(option)) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Not sending request %s to broker %d, since it is offline.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{request, BoxesRunTime.boxToInteger((int)brokerId)})));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                throw new MatchError((Object)option);
            }
        }
    }

    public Function1<AbstractResponse, BoxedUnit> sendRequest$default$4() {
        return null;
    }

    public void addBroker(Broker broker) {
        Object object = this.kafka$controller$ControllerChannelManager$$brokerLock();
        synchronized (object) {
            if (!this.brokerStateInfo().contains((Object)BoxesRunTime.boxToInteger((int)broker.id()))) {
                this.addNewBroker(broker);
                this.startRequestSendThread(broker.id());
            }
        }
    }

    public void removeBroker(int brokerId) {
        Object object = this.kafka$controller$ControllerChannelManager$$brokerLock();
        synchronized (object) {
            this.removeExistingBroker((ControllerBrokerStateInfo)this.brokerStateInfo().apply((Object)BoxesRunTime.boxToInteger((int)brokerId)));
        }
    }

    private void addNewBroker(Broker broker) {
        String string;
        LinkedBlockingQueue<QueueItem> messageQueue = new LinkedBlockingQueue<QueueItem>();
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Controller %d trying to connect to broker %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)$this.config.brokerId()), BoxesRunTime.boxToInteger((int)broker.id())})));
        Node brokerNode = broker.getNode(this.config.interBrokerListenerName());
        LogContext logContext = new LogContext(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"[Controller id=", ", targetBrokerId=", "] "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.config.brokerId()), brokerNode.idString()})));
        ChannelBuilder channelBuilder = ChannelBuilders.clientChannelBuilder((SecurityProtocol)this.config.interBrokerSecurityProtocol(), (JaasContext.Type)JaasContext.Type.SERVER, (AbstractConfig)this.config, (ListenerName)this.config.interBrokerListenerName(), (String)this.config.saslMechanismInterBrokerProtocol(), (boolean)this.config.saslInterBrokerHandshakeRequestEnable());
        Selector selector = new Selector(-1, -1L, this.metrics, this.time, "controller-channel", (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"broker-id"), (Object)brokerNode.idString())}))).asJava(), false, channelBuilder, logContext);
        NetworkClient networkClient = new NetworkClient((Selectable)selector, (MetadataUpdater)new ManualMetadataUpdater((List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Node[]{brokerNode}))).asJava()), ((Object)BoxesRunTime.boxToInteger((int)this.config.brokerId())).toString(), 1, 0L, 0L, -1, -1, Predef$.MODULE$.Integer2int(this.config.requestTimeoutMs()), this.time, false, new ApiVersions(), logContext);
        Option<String> option = this.threadNamePrefix;
        if (None$.MODULE$.equals(option)) {
            string = new StringOps(Predef$.MODULE$.augmentString("Controller-%d-to-broker-%d-send-thread")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.config.brokerId()), BoxesRunTime.boxToInteger((int)broker.id())}));
        } else if (option instanceof Some) {
            Some some = (Some)option;
            String name = (String)some.value();
            string = new StringOps(Predef$.MODULE$.augmentString("%s:Controller-%d-to-broker-%d-send-thread")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{name, BoxesRunTime.boxToInteger((int)this.config.brokerId()), BoxesRunTime.boxToInteger((int)broker.id())}));
        } else {
            throw new MatchError(option);
        }
        String threadName = string;
        RequestSendThread requestThread = new RequestSendThread(this.config.brokerId(), this.controllerContext, messageQueue, networkClient, brokerNode, this.config, this.time, this.stateChangeLogger, threadName);
        requestThread.setDaemon(false);
        Gauge<Object> queueSizeGauge = this.newGauge(ControllerChannelManager$.MODULE$.QueueSizeMetricName(), (Gauge)new Gauge<Object>(null, messageQueue){
            private final LinkedBlockingQueue messageQueue$1;

            public int value() {
                return this.messageQueue$1.size();
            }
            {
                this.messageQueue$1 = messageQueue$1;
            }
        }, (Map<String, String>)this.queueSizeTags(broker.id()));
        this.brokerStateInfo().put((Object)BoxesRunTime.boxToInteger((int)broker.id()), (Object)new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread, queueSizeGauge));
    }

    private scala.collection.immutable.Map<String, String> queueSizeTags(int brokerId) {
        return (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"broker-id"), (Object)((Object)BoxesRunTime.boxToInteger((int)brokerId)).toString())}));
    }

    private void removeExistingBroker(ControllerBrokerStateInfo brokerState) {
        try {
            brokerState.requestSendThread().shutdown();
            brokerState.networkClient().close();
            brokerState.messageQueue().clear();
            this.removeMetric(ControllerChannelManager$.MODULE$.QueueSizeMetricName(), (Map<String, String>)this.queueSizeTags(brokerState.brokerNode().id()));
            this.brokerStateInfo().remove((Object)BoxesRunTime.boxToInteger((int)brokerState.brokerNode().id()));
        }
        catch (Throwable e) {
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Error while removing broker by the controller", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
        }
    }

    public void startRequestSendThread(int brokerId) {
        block0: {
            RequestSendThread requestThread = ((ControllerBrokerStateInfo)this.brokerStateInfo().apply((Object)BoxesRunTime.boxToInteger((int)brokerId))).requestSendThread();
            Thread.State state = requestThread.getState();
            Thread.State state2 = Thread.State.NEW;
            if (state != null ? !((Object)((Object)state)).equals((Object)state2) : state2 != null) break block0;
            requestThread.start();
        }
    }

    public ControllerChannelManager(ControllerContext controllerContext, KafkaConfig config, Time time, Metrics metrics, StateChangeLogger stateChangeLogger, Option<String> threadNamePrefix) {
        this.controllerContext = controllerContext;
        this.config = config;
        this.time = time;
        this.metrics = metrics;
        this.stateChangeLogger = stateChangeLogger;
        this.threadNamePrefix = threadNamePrefix;
        Logging.$init$(this);
        KafkaMetricsGroup.$init$(this);
        this.brokerStateInfo = new HashMap();
        this.kafka$controller$ControllerChannelManager$$brokerLock = new Object();
        this.logIdent_$eq("[Channel manager on controller " + config.brokerId() + "]: ");
        this.newGauge("TotalQueueSize", new Gauge<Object>(this){
            private final /* synthetic */ ControllerChannelManager $outer;

            public int value() {
                int n;
                Object object = this.$outer.kafka$controller$ControllerChannelManager$$brokerLock();
                synchronized (object) {
                    n = BoxesRunTime.unboxToInt((Object)this.$outer.brokerStateInfo().values().iterator().map((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToInteger((int)$anon$1.$anonfun$value$1(x$1))).sum((Numeric)Numeric.IntIsIntegral$.MODULE$));
                }
                return n;
            }

            public static final /* synthetic */ int $anonfun$value$1(ControllerBrokerStateInfo x$1) {
                return x$1.messageQueue().size();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$value$1$adapted(kafka.controller.ControllerBrokerStateInfo )}, serializedLambda);
            }
        }, this.newGauge$default$3());
        controllerContext.liveBrokers().foreach((Function1 & Serializable & scala.Serializable)broker -> {
            this.addNewBroker(broker);
            return BoxedUnit.UNIT;
        });
    }
}

