/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kafka010;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.TaskContext;
import org.apache.spark.internal.Logging;
import org.apache.spark.streaming.kafka010.CacheKey;
import org.apache.spark.streaming.kafka010.InternalKafkaConsumer;
import org.apache.spark.streaming.kafka010.KafkaDataConsumer;
import org.slf4j.Logger;
import scala.Function0;
import scala.Product;
import scala.runtime.BoxedUnit;
import scala.runtime.LambdaDeserialize;
import scala.runtime.LazyRef;

public final class KafkaDataConsumer$
implements Logging {
    public static KafkaDataConsumer$ MODULE$;
    private Map<CacheKey, InternalKafkaConsumer<?, ?>> cache;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new KafkaDataConsumer$();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    public Map<CacheKey, InternalKafkaConsumer<?, ?>> cache() {
        return this.cache;
    }

    public void cache_$eq(Map<CacheKey, InternalKafkaConsumer<?, ?>> x$1) {
        this.cache = x$1;
    }

    public synchronized void init(int initialCapacity, int maxCapacity, float loadFactor) {
        block0: {
            if (this.cache() != null) break block0;
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(21).append("Initializing cache ").append(initialCapacity).append(" ").append(maxCapacity).append(" ").append(loadFactor).toString());
            this.cache_$eq(new LinkedHashMap<CacheKey, InternalKafkaConsumer<?, ?>>(initialCapacity, maxCapacity, loadFactor){
                private final int maxCapacity$1;

                public boolean removeEldestEntry(Map.Entry<CacheKey, InternalKafkaConsumer<?, ?>> entry) {
                    boolean bl;
                    if (!entry.getValue().inUse() && this.size() > this.maxCapacity$1) {
                        KafkaDataConsumer$.MODULE$.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append("KafkaConsumer cache hitting max capacity of ").append($this.maxCapacity$1).append(", ").append(new StringBuilder(22).append("removing consumer for ").append(entry.getKey()).toString()).toString());
                        try {
                            entry.getValue().close();
                        }
                        catch (KafkaException x) {
                            KafkaDataConsumer$.MODULE$.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Error closing oldest Kafka consumer", x);
                        }
                        bl = true;
                    } else {
                        bl = false;
                    }
                    return bl;
                }
                {
                    this.maxCapacity$1 = maxCapacity$1;
                    super(initialCapacity$1, loadFactor$1, true);
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$removeEldestEntry$1(org.apache.spark.streaming.kafka010.KafkaDataConsumer$$anon$1 java.util.Map$Entry ), $anonfun$removeEldestEntry$2()}, serializedLambda);
                }
            });
        }
    }

    public synchronized <K, V> KafkaDataConsumer<K, V> acquire(TopicPartition topicPartition, Map<String, Object> kafkaParams, TaskContext context, boolean useCache) {
        Product product;
        LazyRef newInternalConsumer$lzy = new LazyRef();
        String groupId = (String)kafkaParams.get("group.id");
        CacheKey key = new CacheKey(groupId, topicPartition);
        InternalKafkaConsumer<?, ?> existingInternalConsumer = this.cache().get(key);
        if (context != null && context.attemptNumber() >= 1) {
            Object object;
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(49).append("Reattempt detected, invalidating cached consumer ").append(existingInternalConsumer).toString());
            if (existingInternalConsumer != null) {
                if (existingInternalConsumer.inUse()) {
                    existingInternalConsumer.markedForClose_$eq(true);
                    object = BoxedUnit.UNIT;
                } else {
                    existingInternalConsumer.close();
                    object = this.cache().remove(key);
                }
            } else {
                object = BoxedUnit.UNIT;
            }
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(62).append("Reattempt detected, new non-cached consumer will be allocated ").append(String.valueOf(KafkaDataConsumer$.newInternalConsumer$1(topicPartition, kafkaParams, newInternalConsumer$lzy))).toString());
            product = new KafkaDataConsumer.NonCachedKafkaDataConsumer(KafkaDataConsumer$.newInternalConsumer$1(topicPartition, kafkaParams, newInternalConsumer$lzy));
        } else if (!useCache) {
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(66).append("Cache usage turned off, new non-cached consumer will be allocated ").append(String.valueOf(KafkaDataConsumer$.newInternalConsumer$1(topicPartition, kafkaParams, newInternalConsumer$lzy))).toString());
            product = new KafkaDataConsumer.NonCachedKafkaDataConsumer(KafkaDataConsumer$.newInternalConsumer$1(topicPartition, kafkaParams, newInternalConsumer$lzy));
        } else if (existingInternalConsumer == null) {
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(58).append("No cached consumer, new cached consumer will be allocated ").append(String.valueOf(KafkaDataConsumer$.newInternalConsumer$1(topicPartition, kafkaParams, newInternalConsumer$lzy))).toString());
            this.cache().put(key, KafkaDataConsumer$.newInternalConsumer$1(topicPartition, kafkaParams, newInternalConsumer$lzy));
            product = new KafkaDataConsumer.CachedKafkaDataConsumer(KafkaDataConsumer$.newInternalConsumer$1(topicPartition, kafkaParams, newInternalConsumer$lzy));
        } else if (existingInternalConsumer.inUse()) {
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(70).append("Used cached consumer found, new non-cached consumer will be allocated ").append(String.valueOf(KafkaDataConsumer$.newInternalConsumer$1(topicPartition, kafkaParams, newInternalConsumer$lzy))).toString());
            product = new KafkaDataConsumer.NonCachedKafkaDataConsumer(KafkaDataConsumer$.newInternalConsumer$1(topicPartition, kafkaParams, newInternalConsumer$lzy));
        } else {
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(44).append("Not used cached consumer found, re-using it ").append(existingInternalConsumer).toString());
            existingInternalConsumer.inUse_$eq(true);
            product = new KafkaDataConsumer.CachedKafkaDataConsumer(existingInternalConsumer);
        }
        return product;
    }

    public void org$apache$spark$streaming$kafka010$KafkaDataConsumer$$release(InternalKafkaConsumer<?, ?> internalConsumer) {
        KafkaDataConsumer$ kafkaDataConsumer$ = this;
        synchronized (kafkaDataConsumer$) {
            Object object;
            CacheKey key = new CacheKey(internalConsumer.groupId(), internalConsumer.topicPartition());
            InternalKafkaConsumer<?, ?> cachedInternalConsumer = this.cache().get(key);
            if (internalConsumer == cachedInternalConsumer) {
                if (internalConsumer.markedForClose()) {
                    internalConsumer.close();
                    object = this.cache().remove(key);
                } else {
                    internalConsumer.inUse_$eq(false);
                    object = BoxedUnit.UNIT;
                }
            } else {
                internalConsumer.close();
                this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(70).append("Released a supposedly cached consumer that was not found in the cache ").append(String.valueOf(internalConsumer)).toString());
                object = BoxedUnit.UNIT;
            }
        }
    }

    private static final /* synthetic */ InternalKafkaConsumer newInternalConsumer$lzycompute$1(TopicPartition topicPartition$1, Map kafkaParams$1, LazyRef newInternalConsumer$lzy$1) {
        InternalKafkaConsumer internalKafkaConsumer;
        LazyRef lazyRef = newInternalConsumer$lzy$1;
        synchronized (lazyRef) {
            internalKafkaConsumer = newInternalConsumer$lzy$1.initialized() ? (InternalKafkaConsumer)newInternalConsumer$lzy$1.value() : (InternalKafkaConsumer)newInternalConsumer$lzy$1.initialize(new InternalKafkaConsumer(topicPartition$1, kafkaParams$1));
        }
        return internalKafkaConsumer;
    }

    private static final InternalKafkaConsumer newInternalConsumer$1(TopicPartition topicPartition$1, Map kafkaParams$1, LazyRef newInternalConsumer$lzy$1) {
        return newInternalConsumer$lzy$1.initialized() ? (InternalKafkaConsumer)newInternalConsumer$lzy$1.value() : KafkaDataConsumer$.newInternalConsumer$lzycompute$1(topicPartition$1, kafkaParams$1, newInternalConsumer$lzy$1);
    }

    private KafkaDataConsumer$() {
        MODULE$ = this;
        Logging.$init$((Logging)this);
        this.cache = null;
    }
}

