/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.eventhubs.client;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.ReceiverDisconnectedException;
import java.io.Serializable;
import java.util.concurrent.CompletionException;
import java.util.concurrent.RejectedExecutionException;
import org.apache.spark.SparkEnv$;
import org.apache.spark.eventhubs.EventHubsConf;
import org.apache.spark.eventhubs.EventHubsUtils$;
import org.apache.spark.eventhubs.NameAndPartition;
import org.apache.spark.eventhubs.PartitionPerformanceReceiver$;
import org.apache.spark.eventhubs.client.CachedEventHubsReceiver;
import org.apache.spark.eventhubs.client.CachedReceiver;
import org.apache.spark.eventhubs.package$;
import org.apache.spark.internal.Logging;
import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.spark.util.RpcUtils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.HashMap;
import scala.runtime.BoxesRunTime;

public final class CachedEventHubsReceiver$
implements CachedReceiver,
Logging {
    public static CachedEventHubsReceiver$ MODULE$;
    private final long startRecieverTimeNs;
    private final HashMap<String, CachedEventHubsReceiver> receivers;
    private final RpcEndpointRef partitionPerformanceReceiverRef;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new CachedEventHubsReceiver$();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private long startRecieverTimeNs() {
        return this.startRecieverTimeNs;
    }

    public RpcEndpointRef partitionPerformanceReceiverRef() {
        return this.partitionPerformanceReceiverRef;
    }

    private String key(EventHubsConf ehConf, NameAndPartition nAndP) {
        return (ehConf.connectionString() + ehConf.consumerGroup() + nAndP.partitionId()).toLowerCase();
    }

    @Override
    public Iterator<EventData> receive(EventHubsConf ehConf, NameAndPartition nAndP, long requestSeqNo, int batchSize) {
        Iterator<EventData> iterator;
        long taskId = EventHubsUtils$.MODULE$.getTaskId();
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"(TID ", ") EventHubsCachedReceiver look up. For namespaceUri ", " "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)taskId), ehConf.namespaceUri()})) + new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"EventHubNameAndPartition ", " consumer group ", ". "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{nAndP, ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())})) + new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"requestSeqNo: ", ", batchSize: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)requestSeqNo), BoxesRunTime.boxToInteger((int)batchSize)})));
        CachedEventHubsReceiver receiver = null;
        HashMap<String, CachedEventHubsReceiver> hashMap = this.receivers;
        synchronized (hashMap) {
            receiver = (CachedEventHubsReceiver)this.receivers.getOrElseUpdate((Object)this.key(ehConf, nAndP), (Function0 & Serializable & scala.Serializable)() -> MODULE$.apply(ehConf, nAndP, requestSeqNo));
        }
        try {
            iterator = receiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(requestSeqNo, batchSize);
        }
        catch (CompletionException completionExecution) {
            Throwable exceptionCause = completionExecution.getCause();
            if (exceptionCause != null && exceptionCause instanceof RejectedExecutionException && exceptionCause.getMessage().contains("ReactorDispatcher instance is closed")) {
                this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"(TID ", ") EventHubsCachedReceiver receive execution for namespaceUri ", " "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)taskId), ehConf.namespaceUri()})) + new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"EventHubNameAndPartition ", " consumer group ", " "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{nAndP, ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())})) + new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"failed with ", ". Try to recreate the entire CachedEventHubsReceiver instance in order to "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{completionExecution})) + new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"use a fresh EventHubClient from the underlying java SDK, then try receiving events again."})).s((Seq)Nil$.MODULE$));
                receiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$client().close();
                receiver = this.apply(ehConf, nAndP, requestSeqNo);
                HashMap<String, CachedEventHubsReceiver> hashMap2 = this.receivers;
                synchronized (hashMap2) {
                    this.receivers.update((Object)this.key(ehConf, nAndP), (Object)receiver);
                }
                iterator = receiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(requestSeqNo, batchSize);
            }
            if (exceptionCause != null && exceptionCause instanceof ReceiverDisconnectedException) {
                this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"(TID ", ") EventHubsCachedReceiver receive execution for namespaceUri ", " "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)taskId), ehConf.namespaceUri()})) + new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"EventHubNameAndPartition ", " consumer group ", " "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{nAndP, ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())})) + new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"failed because another receiver for the same <NS-EH-CG-Part> combo has been created and caused this one "})).s((Seq)Nil$.MODULE$) + new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"to get disconnected. The full error is: ", ". Throw the exception so that the driver can "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{completionExecution})) + new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"retry the task."})).s((Seq)Nil$.MODULE$));
                throw completionExecution;
            }
            throw completionExecution;
        }
        return iterator;
    }

    public CachedEventHubsReceiver apply(EventHubsConf ehConf, NameAndPartition nAndP, long startSeqNo) {
        return new CachedEventHubsReceiver(ehConf, nAndP, startSeqNo);
    }

    private CachedEventHubsReceiver$() {
        MODULE$ = this;
        Logging.$init$((Logging)this);
        this.startRecieverTimeNs = System.nanoTime();
        this.receivers = new HashMap();
        this.partitionPerformanceReceiverRef = RpcUtils$.MODULE$.makeDriverRef(PartitionPerformanceReceiver$.MODULE$.ENDPOINT_NAME(), SparkEnv$.MODULE$.get().conf(), SparkEnv$.MODULE$.get().rpcEnv());
    }
}

