/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kafka010;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.TaskContext;
import org.apache.spark.internal.LogEntry;
import org.apache.spark.internal.LogEntry$;
import org.apache.spark.internal.LogKey;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.MDC;
import org.apache.spark.streaming.kafka010.CacheKey;
import org.apache.spark.streaming.kafka010.InternalKafkaConsumer;
import org.apache.spark.streaming.kafka010.KafkaDataConsumer;
import org.slf4j.Logger;
import scala.Function0;
import scala.StringContext;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.LazyRef;
import scala.runtime.ScalaRunTime$;

public final class KafkaDataConsumer$
implements Logging {
    public static final KafkaDataConsumer$ MODULE$ = new KafkaDataConsumer$();
    private static Map<CacheKey, InternalKafkaConsumer<?, ?>> cache;
    private static transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        Logging.$init$((Logging)MODULE$);
        cache = null;
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public Logging.LogStringContext LogStringContext(StringContext sc) {
        return Logging.LogStringContext$((Logging)this, (StringContext)sc);
    }

    public void withLogContext(Map<String, String> context, Function0<BoxedUnit> body) {
        Logging.withLogContext$((Logging)this, context, body);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logInfo(LogEntry entry) {
        Logging.logInfo$((Logging)this, (LogEntry)entry);
    }

    public void logInfo(LogEntry entry, Throwable throwable) {
        Logging.logInfo$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logDebug(LogEntry entry) {
        Logging.logDebug$((Logging)this, (LogEntry)entry);
    }

    public void logDebug(LogEntry entry, Throwable throwable) {
        Logging.logDebug$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logTrace(LogEntry entry) {
        Logging.logTrace$((Logging)this, (LogEntry)entry);
    }

    public void logTrace(LogEntry entry, Throwable throwable) {
        Logging.logTrace$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logWarning(LogEntry entry) {
        Logging.logWarning$((Logging)this, (LogEntry)entry);
    }

    public void logWarning(LogEntry entry, Throwable throwable) {
        Logging.logWarning$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logError(LogEntry entry) {
        Logging.logError$((Logging)this, (LogEntry)entry);
    }

    public void logError(LogEntry entry, Throwable throwable) {
        Logging.logError$((Logging)this, (LogEntry)entry, (Throwable)throwable);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        org$apache$spark$internal$Logging$$log_ = x$1;
    }

    public Map<CacheKey, InternalKafkaConsumer<?, ?>> cache() {
        return cache;
    }

    public void cache_$eq(Map<CacheKey, InternalKafkaConsumer<?, ?>> x$1) {
        cache = x$1;
    }

    public synchronized void init(int initialCapacity, int maxCapacity, float loadFactor) {
        if (this.cache() == null) {
            this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Initializing cache ", " "}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.INITIAL_CAPACITY$.MODULE$, (Object)BoxesRunTime.boxToInteger((int)initialCapacity))})).$plus(MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"", " ", ""}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.MAX_CAPACITY$.MODULE$, (Object)BoxesRunTime.boxToInteger((int)maxCapacity)), new MDC((LogKey)LogKeys.LOAD_FACTOR$.MODULE$, (Object)BoxesRunTime.boxToFloat((float)loadFactor))})))));
            this.cache_$eq(new LinkedHashMap<CacheKey, InternalKafkaConsumer<?, ?>>(initialCapacity, loadFactor, maxCapacity){
                private final int maxCapacity$1;

                public boolean removeEldestEntry(Map.Entry<CacheKey, InternalKafkaConsumer<?, ?>> entry) {
                    if (!entry.getValue().inUse() && this.size() > this.maxCapacity$1) {
                        KafkaDataConsumer$.MODULE$.logWarning(LogEntry$.MODULE$.from((Function0 & Serializable)() -> KafkaDataConsumer$.MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"KafkaConsumer cache hitting max capacity of ", ", "}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.MAX_CAPACITY$.MODULE$, (Object)BoxesRunTime.boxToInteger((int)$this.maxCapacity$1))})).$plus(KafkaDataConsumer$.MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"removing consumer for ", ""}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.KEY$.MODULE$, entry.getKey())})))));
                        try {
                            entry.getValue().close();
                        }
                        catch (KafkaException x) {
                            KafkaDataConsumer$.MODULE$.logError((Function0<String>)(Function0 & Serializable)() -> "Error closing oldest Kafka consumer", (Throwable)x);
                        }
                        return true;
                    }
                    return false;
                }
                {
                    this.maxCapacity$1 = maxCapacity$1;
                    super(initialCapacity$1, loadFactor$1, true);
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$removeEldestEntry$1(org.apache.spark.streaming.kafka010.KafkaDataConsumer$$anon$1 java.util.Map$Entry ), $anonfun$removeEldestEntry$2()}, serializedLambda);
                }
            });
            return;
        }
    }

    public synchronized <K, V> KafkaDataConsumer<K, V> acquire(TopicPartition topicPartition, Map<String, Object> kafkaParams, TaskContext context, boolean useCache) {
        LazyRef newInternalConsumer$lzy = new LazyRef();
        String groupId = (String)kafkaParams.get("group.id");
        CacheKey key = new CacheKey(groupId, topicPartition);
        InternalKafkaConsumer<?, ?> existingInternalConsumer = this.cache().get(key);
        if (context != null && context.attemptNumber() >= 1) {
            this.logDebug((Function0<String>)(Function0 & Serializable)() -> "Reattempt detected, invalidating cached consumer " + existingInternalConsumer);
            if (existingInternalConsumer != null) {
                if (existingInternalConsumer.inUse()) {
                    existingInternalConsumer.markedForClose_$eq(true);
                    v0 = BoxedUnit.UNIT;
                } else {
                    existingInternalConsumer.close();
                    v0 = this.cache().remove(key);
                }
            } else {
                v0 = BoxedUnit.UNIT;
            }
            this.logDebug((Function0<String>)(Function0 & Serializable)() -> "Reattempt detected, new non-cached consumer will be allocated " + KafkaDataConsumer$.newInternalConsumer$1(newInternalConsumer$lzy, topicPartition, kafkaParams));
            return new KafkaDataConsumer.NonCachedKafkaDataConsumer(KafkaDataConsumer$.newInternalConsumer$1(newInternalConsumer$lzy, topicPartition, kafkaParams));
        }
        if (!useCache) {
            this.logDebug((Function0<String>)(Function0 & Serializable)() -> "Cache usage turned off, new non-cached consumer will be allocated " + KafkaDataConsumer$.newInternalConsumer$1(newInternalConsumer$lzy, topicPartition, kafkaParams));
            return new KafkaDataConsumer.NonCachedKafkaDataConsumer(KafkaDataConsumer$.newInternalConsumer$1(newInternalConsumer$lzy, topicPartition, kafkaParams));
        }
        if (existingInternalConsumer == null) {
            this.logDebug((Function0<String>)(Function0 & Serializable)() -> "No cached consumer, new cached consumer will be allocated " + KafkaDataConsumer$.newInternalConsumer$1(newInternalConsumer$lzy, topicPartition, kafkaParams));
            this.cache().put(key, KafkaDataConsumer$.newInternalConsumer$1(newInternalConsumer$lzy, topicPartition, kafkaParams));
            return new KafkaDataConsumer.CachedKafkaDataConsumer(KafkaDataConsumer$.newInternalConsumer$1(newInternalConsumer$lzy, topicPartition, kafkaParams));
        }
        if (existingInternalConsumer.inUse()) {
            this.logDebug((Function0<String>)(Function0 & Serializable)() -> "Used cached consumer found, new non-cached consumer will be allocated " + KafkaDataConsumer$.newInternalConsumer$1(newInternalConsumer$lzy, topicPartition, kafkaParams));
            return new KafkaDataConsumer.NonCachedKafkaDataConsumer(KafkaDataConsumer$.newInternalConsumer$1(newInternalConsumer$lzy, topicPartition, kafkaParams));
        }
        this.logDebug((Function0<String>)(Function0 & Serializable)() -> "Not used cached consumer found, re-using it " + existingInternalConsumer);
        existingInternalConsumer.inUse_$eq(true);
        return new KafkaDataConsumer.CachedKafkaDataConsumer(existingInternalConsumer);
    }

    public void org$apache$spark$streaming$kafka010$KafkaDataConsumer$$release(InternalKafkaConsumer<?, ?> internalConsumer) {
        KafkaDataConsumer$ kafkaDataConsumer$ = this;
        synchronized (kafkaDataConsumer$) {
            CacheKey key = new CacheKey(internalConsumer.groupId(), internalConsumer.topicPartition());
            InternalKafkaConsumer<?, ?> cachedInternalConsumer = this.cache().get(key);
            if (internalConsumer == cachedInternalConsumer) {
                if (internalConsumer.markedForClose()) {
                    internalConsumer.close();
                    v0 = this.cache().remove(key);
                } else {
                    internalConsumer.inUse_$eq(false);
                    v0 = BoxedUnit.UNIT;
                }
            } else {
                internalConsumer.close();
                this.logInfo(LogEntry$.MODULE$.from((Function0 & Serializable)() -> MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"Released a supposedly cached consumer that was not found in the cache "}))).log((Seq)Nil$.MODULE$).$plus(MODULE$.LogStringContext(new StringContext((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"", ""}))).log((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new MDC[]{new MDC((LogKey)LogKeys.CONSUMER$.MODULE$, (Object)internalConsumer)})))));
                v0 = BoxedUnit.UNIT;
            }
        }
    }

    private static final /* synthetic */ InternalKafkaConsumer newInternalConsumer$lzycompute$1(LazyRef newInternalConsumer$lzy$1, TopicPartition topicPartition$1, Map kafkaParams$1) {
        InternalKafkaConsumer internalKafkaConsumer;
        LazyRef lazyRef = newInternalConsumer$lzy$1;
        synchronized (lazyRef) {
            internalKafkaConsumer = newInternalConsumer$lzy$1.initialized() ? (InternalKafkaConsumer)newInternalConsumer$lzy$1.value() : (InternalKafkaConsumer)newInternalConsumer$lzy$1.initialize(new InternalKafkaConsumer(topicPartition$1, kafkaParams$1));
        }
        return internalKafkaConsumer;
    }

    private static final InternalKafkaConsumer newInternalConsumer$1(LazyRef newInternalConsumer$lzy$1, TopicPartition topicPartition$1, Map kafkaParams$1) {
        if (newInternalConsumer$lzy$1.initialized()) {
            return (InternalKafkaConsumer)newInternalConsumer$lzy$1.value();
        }
        return KafkaDataConsumer$.newInternalConsumer$lzycompute$1(newInternalConsumer$lzy$1, topicPartition$1, kafkaParams$1);
    }

    private KafkaDataConsumer$() {
    }
}

