/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.eventhubs.client;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
import com.microsoft.azure.eventhubs.ReceiverOptions;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.spark.SparkEnv$;
import org.apache.spark.eventhubs.EventHubsConf;
import org.apache.spark.eventhubs.EventHubsUtils$;
import org.apache.spark.eventhubs.EventPosition$;
import org.apache.spark.eventhubs.NameAndPartition;
import org.apache.spark.eventhubs.client.CachedEventHubsReceiver$;
import org.apache.spark.eventhubs.client.ClientConnectionPool$;
import org.apache.spark.eventhubs.package$;
import org.apache.spark.eventhubs.utils.MetricPlugin;
import org.apache.spark.eventhubs.utils.RetryUtils$;
import org.apache.spark.internal.Logging;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.package;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\tUa!B\u000e\u001d\u0001q1\u0003\u0002C\u001a\u0001\u0005\u0003\u0005\u000b\u0011B\u001b\t\u0011e\u0002!\u0011!Q\u0001\niB\u0001\"\u0010\u0001\u0003\u0002\u0003\u0006IA\u0010\u0005\u0006!\u0002!I!U\u0003\u0005/\u0002\u0001\u0001\f\u0003\u0005c\u0001!\u0015\r\u0011\"\u0003d\u0011!i\u0002\u0001#b\u0001\n\u0013i\u0007bB=\u0001\u0001\u0004%IA\u001f\u0005\b}\u0002\u0001\r\u0011\"\u0003\u0000\u0011\u001d\tY\u0001\u0001Q!\nmDq!!\u0004\u0001\t\u0013\ty\u0001C\u0004\u0002\u0016\u0001!I!a\u0006\t\u000f\u0005%\u0002\u0001\"\u0003\u0002,!9\u0011\u0011\u000e\u0001\u0005\n\u0005-\u0004bBA>\u0001\u0011%\u0011Q\u0010\u0005\b\u0003\u0003\u0003A\u0011BAB\u0011\u001d\tI\t\u0001C\u0005\u0003\u0017Cq!a(\u0001\t\u0013\t\tk\u0002\u0005\u0002HrA\t\u0001IAe\r\u001dYB\u0004#\u0001!\u0003\u0017Da\u0001\u0015\u000b\u0005\u0002\u0005MWABAk)\u0001\t9\u000e\u0003\u0005\u0002vR\u0001\u000b\u0011BA|\u0011\u001d\tI\u0010\u0006C\u0005\u0003wD\u0001\"!#\u0015\t\u0003r\"\u0011\u0001\u0005\b\u0005\u0017!B\u0011\u0001B\u0007\u0005]\u0019\u0015m\u00195fI\u00163XM\u001c;Ik\n\u001c(+Z2fSZ,'O\u0003\u0002\u001e=\u000511\r\\5f]RT!a\b\u0011\u0002\u0013\u00154XM\u001c;ik\n\u001c(BA\u0011#\u0003\u0015\u0019\b/\u0019:l\u0015\t\u0019C%\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002K\u0005\u0019qN]4\u0014\u0007\u00019S\u0006\u0005\u0002)W5\t\u0011FC\u0001+\u0003\u0015\u00198-\u00197b\u0013\ta\u0013F\u0001\u0004B]f\u0014VM\u001a\t\u0003]Ej\u0011a\f\u0006\u0003a\u0001\n\u0001\"\u001b8uKJt\u0017\r\\\u0005\u0003e=\u0012q\u0001T8hO&tw-\u0001\u0004fQ\u000e{gNZ\u0002\u0001!\t1t'D\u0001\u001f\u0013\tAdDA\u0007Fm\u0016tG\u000fS;cg\u000e{gNZ\u0001\u0006]\u0006sG\r\u0015\t\u0003mmJ!\u0001\u0010\u0010\u0003!9\u000bW.Z!oIB\u000b'\u000f^5uS>t\u0017AC:uCJ$8+Z9O_B\u0011q(\u0014\b\u0003\u0001.s!!\u0011&\u000f\u0005\tKeBA\"I\u001d\t!u)D\u0001F\u0015\t1E'\u0001\u0004=e>|GOP\u0005\u0002K%\u00111\u0005J\u0005\u0003C\tJ!a\b\u0011\n\u00051s\u0012a\u00029bG.\fw-Z\u0005\u0003\u001d>\u0013abU3rk\u0016t7-\u001a(v[\n,'O\u0003\u0002M=\u00051A(\u001b8jiz\"BA\u0015+V-B\u00111\u000bA\u0007\u00029!)1\u0007\u0002a\u0001k!)\u0011\b\u0002a\u0001u!)Q\b\u0002a\u0001}\t)\u0012i^1jiRKW.Z8vi\u0016C8-\u001a9uS>t\u0007CA-a\u001b\u0005Q&BA.]\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003;z\u000bA!\u001e;jY*\tq,\u0001\u0003kCZ\f\u0017BA1[\u0005A!\u0016.\\3pkR,\u0005pY3qi&|g.\u0001\u0007nKR\u0014\u0018n\u0019)mk\u001eLg.F\u0001e!\rASmZ\u0005\u0003M&\u0012aa\u00149uS>t\u0007C\u00015l\u001b\u0005I'B\u00016\u001f\u0003\u0015)H/\u001b7t\u0013\ta\u0017N\u0001\u0007NKR\u0014\u0018n\u0019)mk\u001eLg.F\u0001o!\tyw/D\u0001q\u0015\ty\u0012O\u0003\u0002sg\u0006)\u0011M_;sK*\u0011A/^\u0001\n[&\u001c'o\\:pMRT\u0011A^\u0001\u0004G>l\u0017B\u0001=q\u00059)e/\u001a8u\u0011V\u00147\t\\5f]R\f\u0001B]3dK&4XM]\u000b\u0002wB\u0011q\u000e`\u0005\u0003{B\u0014\u0011\u0003U1si&$\u0018n\u001c8SK\u000e,\u0017N^3s\u00031\u0011XmY3jm\u0016\u0014x\fJ3r)\u0011\t\t!a\u0002\u0011\u0007!\n\u0019!C\u0002\u0002\u0006%\u0012A!\u00168ji\"A\u0011\u0011B\u0005\u0002\u0002\u0003\u000710A\u0002yIE\n\u0011B]3dK&4XM\u001d\u0011\u0002\u001d\r\u0014X-\u0019;f%\u0016\u001cW-\u001b<feR\u001910!\u0005\t\r\u0005M1\u00021\u0001?\u0003\u0015\u0019X-\u001d(p\u0003Ia\u0017m\u001d;SK\u000e,\u0017N^3e\u001f\u001a47/\u001a;\u0015\u0005\u0005e\u0001CBA\u000e\u0003?\t\u0019#\u0004\u0002\u0002\u001e)\u00111,K\u0005\u0005\u0003C\tiB\u0001\u0004GkR,(/\u001a\t\u0004Q\u0005\u0015\u0012bAA\u0014S\t!Aj\u001c8h\u0003)\u0011XmY3jm\u0016|e.\u001a\u000b\u0007\u0003[\t)%!\u0016\u0011\r\u0005m\u0011qDA\u0018!\u0019\t\t$!\u000f\u0002@9!\u00111GA\u001c\u001d\r!\u0015QG\u0005\u0002U%\u0011A*K\u0005\u0005\u0003w\tiD\u0001\u0005Ji\u0016\u0014\u0018M\u00197f\u0015\ta\u0015\u0006E\u0002p\u0003\u0003J1!a\u0011q\u0005%)e/\u001a8u\t\u0006$\u0018\rC\u0004\u0002H5\u0001\r!!\u0013\u0002\u000fQLW.Z8viB!\u00111JA)\u001b\t\tiEC\u0002\u0002Py\u000bA\u0001^5nK&!\u00111KA'\u0005!!UO]1uS>t\u0007bBA,\u001b\u0001\u0007\u0011\u0011L\u0001\u0004[N<\u0007\u0003BA.\u0003GrA!!\u0018\u0002`A\u0011A)K\u0005\u0004\u0003CJ\u0013A\u0002)sK\u0012,g-\u0003\u0003\u0002f\u0005\u001d$AB*ue&twMC\u0002\u0002b%\nQb\u00197pg\u0016\u0014VmY3jm\u0016\u0014HCAA7!\u0019\tY\"a\b\u0002pA!\u0011\u0011OA<\u001b\t\t\u0019HC\u0002\u0002vy\u000bA\u0001\\1oO&!\u0011\u0011PA:\u0005\u00111v.\u001b3\u0002!I,7M]3bi\u0016\u0014VmY3jm\u0016\u0014H\u0003BA\u0001\u0003\u007fBa!a\u0005\u0010\u0001\u0004q\u0014aC2iK\u000e\\7)\u001e:t_J$B!!\f\u0002\u0006\"1\u0011q\u0011\tA\u0002y\nAB]3rk\u0016\u001cHoU3r\u001d>\fqA]3dK&4X\r\u0006\u0004\u0002\u000e\u0006M\u0015Q\u0013\t\u0007\u0003c\ty)a\u0010\n\t\u0005E\u0015Q\b\u0002\t\u0013R,'/\u0019;pe\"1\u0011qQ\tA\u0002yBq!a&\u0012\u0001\u0004\tI*A\u0005cCR\u001c\u0007nU5{KB\u0019\u0001&a'\n\u0007\u0005u\u0015FA\u0002J]R\f1#Y<bSR\u0014VmY3jm\u0016lUm]:bO\u0016,B!a)\u0002*R1\u0011QUA^\u0003\u000b\u0004B!a*\u0002*2\u0001AaBAV%\t\u0007\u0011Q\u0016\u0002\u0002)F!\u0011qVA[!\rA\u0013\u0011W\u0005\u0004\u0003gK#a\u0002(pi\"Lgn\u001a\t\u0004Q\u0005]\u0016bAA]S\t\u0019\u0011I\\=\t\u000f\u0005u&\u00031\u0001\u0002@\u0006I\u0011m^1ji\u0006\u0014G.\u001a\t\u0007\u00037\t\t-!*\n\t\u0005\r\u0017Q\u0004\u0002\n\u0003^\f\u0017\u000e^1cY\u0016Da!a\"\u0013\u0001\u0004q\u0014aF\"bG\",G-\u0012<f]RDUOY:SK\u000e,\u0017N^3s!\t\u0019FcE\u0003\u0015O\u00055W\u0006E\u0002T\u0003\u001fL1!!5\u001d\u00059\u0019\u0015m\u00195fIJ+7-Z5wKJ$\"!!3\u0003\u00155+H/\u00192mK6\u000b\u0007/\u0006\u0004\u0002Z\u0006-\u0018\u0011\u001f\t\t\u00037\f)/!;\u0002p6\u0011\u0011Q\u001c\u0006\u0005\u0003?\f\t/A\u0004nkR\f'\r\\3\u000b\u0007\u0005\r\u0018&\u0001\u0006d_2dWm\u0019;j_:LA!a:\u0002^\n9\u0001*Y:i\u001b\u0006\u0004\b\u0003BAT\u0003W$q!!<\u0017\u0005\u0004\tiKA\u0001B!\u0011\t9+!=\u0005\u000f\u0005MhC1\u0001\u0002.\n\t!)A\u0005sK\u000e,\u0017N^3sgB9\u00111\\As\u00033\u0012\u0016aA6fsR1\u0011\u0011LA\u007f\u0003\u007fDQa\r\rA\u0002UBQ!\u000f\rA\u0002i\"\"\"!$\u0003\u0004\t\u0015!q\u0001B\u0005\u0011\u0015\u0019\u0014\u00041\u00016\u0011\u0015I\u0014\u00041\u0001;\u0011\u0019\t9)\u0007a\u0001}!9\u0011qS\rA\u0002\u0005e\u0015!B1qa2LHc\u0002*\u0003\u0010\tE!1\u0003\u0005\u0006gi\u0001\r!\u000e\u0005\u0006si\u0001\rA\u000f\u0005\u0006{i\u0001\rA\u0010")
public class CachedEventHubsReceiver
implements Logging {
    private Option<MetricPlugin> metricPlugin;
    private EventHubClient client;
    private final EventHubsConf ehConf;
    private final NameAndPartition nAndP;
    private PartitionReceiver receiver;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;

    public static CachedEventHubsReceiver apply(EventHubsConf eventHubsConf, NameAndPartition nameAndPartition, long l) {
        return CachedEventHubsReceiver$.MODULE$.apply(eventHubsConf, nameAndPartition, l);
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private Option<MetricPlugin> metricPlugin$lzycompute() {
        CachedEventHubsReceiver cachedEventHubsReceiver = this;
        synchronized (cachedEventHubsReceiver) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.metricPlugin = this.ehConf.metricPlugin();
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.metricPlugin;
    }

    private Option<MetricPlugin> metricPlugin() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.metricPlugin$lzycompute() : this.metricPlugin;
    }

    private EventHubClient client$lzycompute() {
        CachedEventHubsReceiver cachedEventHubsReceiver = this;
        synchronized (cachedEventHubsReceiver) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.client = ClientConnectionPool$.MODULE$.borrowClient(this.ehConf);
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.client;
    }

    private EventHubClient client() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.client$lzycompute() : this.client;
    }

    private PartitionReceiver receiver() {
        return this.receiver;
    }

    private void receiver_$eq(PartitionReceiver x$1) {
        this.receiver = x$1;
    }

    private PartitionReceiver createReceiver(long seqNo) {
        long taskId = EventHubsUtils$.MODULE$.getTaskId();
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(62).append("(TID ").append(taskId).append(") creating receiver for Event Hub ").append($this.nAndP.ehName()).append(" on partition ").append($this.nAndP.partitionId()).append(". seqNo: ").append(seqNo).toString());
        String consumerGroup = (String)this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup());
        ReceiverOptions receiverOptions = new ReceiverOptions();
        receiverOptions.setReceiverRuntimeMetricEnabled(true);
        receiverOptions.setPrefetchCount(BoxesRunTime.unboxToInt((Object)this.ehConf.prefetchCount().getOrElse((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultPrefetchCount())));
        receiverOptions.setIdentifier(new StringBuilder(7).append("spark-").append(SparkEnv$.MODULE$.get().executorId()).append("-").append(taskId).toString());
        Future consumer = RetryUtils$.MODULE$.retryJava((Function0 & Serializable & scala.Serializable)() -> EventHubsUtils$.MODULE$.createReceiverInner(this.client(), $this.ehConf.useExclusiveReceiver(), consumerGroup, ((Object)BoxesRunTime.boxToInteger((int)$this.nAndP.partitionId())).toString(), EventPosition$.MODULE$.fromSequenceNumber(seqNo).convert(), receiverOptions), "CachedReceiver creation.", RetryUtils$.MODULE$.retryJava$default$3(), RetryUtils$.MODULE$.retryJava$default$4());
        return (PartitionReceiver)Await$.MODULE$.result(consumer, (Duration)this.ehConf.internalOperationTimeout());
    }

    private Future<Object> lastReceivedOffset() {
        return this.receiver().getEventPosition().getSequenceNumber() != null ? Future$.MODULE$.successful((Object)BoxesRunTime.boxToLong((long)Predef$.MODULE$.Long2long(this.receiver().getEventPosition().getSequenceNumber()))) : Future$.MODULE$.successful((Object)BoxesRunTime.boxToLong((long)-1L));
    }

    private Future<Iterable<EventData>> receiveOne(java.time.Duration timeout, String msg) {
        return this.receiveOneWithRetry$1(timeout, msg, 0);
    }

    private Future<Void> closeReceiver() {
        return RetryUtils$.MODULE$.retryJava((Function0 & Serializable & scala.Serializable)() -> this.receiver().close(), "closing a receiver", RetryUtils$.MODULE$.retryJava$default$3(), RetryUtils$.MODULE$.retryJava$default$4());
    }

    private void recreateReceiver(long seqNo) {
        long taskId = EventHubsUtils$.MODULE$.getTaskId();
        long startTimeNs = System.nanoTime();
        Object object = !this.ehConf.useExclusiveReceiver() ? Await$.MODULE$.result(this.closeReceiver(), (Duration)this.ehConf.internalOperationTimeout()) : BoxedUnit.UNIT;
        this.receiver_$eq(this.createReceiver(seqNo));
        long elapsedTimeMs = TimeUnit.NANOSECONDS.toMillis(CachedEventHubsReceiver.elapsedTimeNs$1(startTimeNs));
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(49).append("(TID ").append(taskId).append(") Finished recreating a receiver for ").append($this.nAndP).append(", ").append($this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())).append(": ").append(elapsedTimeMs).append(" ms").toString());
    }

    /*
     * Enabled aggressive block sorting
     */
    private Future<Iterable<EventData>> checkCursor(long requestSeqNo) {
        Future future;
        Iterable<EventData> event;
        long receivedSeqNo;
        long taskId = EventHubsUtils$.MODULE$.getTaskId();
        long lastReceivedSeqNo = BoxesRunTime.unboxToLong((Object)Await$.MODULE$.result(this.lastReceivedOffset(), (Duration)this.ehConf.internalOperationTimeout()));
        if (lastReceivedSeqNo > -1L && lastReceivedSeqNo + 1L != requestSeqNo || !this.receiver().getIsOpen()) {
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(95).append("(TID ").append(taskId).append(") checkCursor. Recreating a receiver for ").append($this.nAndP).append(", ").append($this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())).append(". requestSeqNo: ").append(requestSeqNo).append(", lastReceivedSeqNo: ").append(lastReceivedSeqNo).append(", isOpen: ").append(this.receiver().getIsOpen()).toString());
            this.recreateReceiver(requestSeqNo);
        }
        if ((receivedSeqNo = ((EventData)(event = this.awaitReceiveMessage((Awaitable)this.receiveOne((java.time.Duration)this.ehConf.receiverTimeout().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultReceiverTimeout()), "checkCursor initial"), requestSeqNo)).head()).getSystemProperties().getSequenceNumber()) == requestSeqNo) {
            future = Future$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> event, ExecutionContext.Implicits$.MODULE$.global());
            return future;
        }
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(81).append("(TID ").append(taskId).append(") checkCursor. Recreating a receiver for ").append($this.nAndP).append(", ").append($this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())).append(". requestSeqNo: ").append(requestSeqNo).append(", receivedSeqNo: ").append(receivedSeqNo).toString());
        this.recreateReceiver(requestSeqNo);
        Iterable<EventData> movedEvent = this.awaitReceiveMessage((Awaitable)this.receiveOne((java.time.Duration)this.ehConf.receiverTimeout().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultReceiverTimeout()), "checkCursor move"), requestSeqNo);
        long movedSeqNo = ((EventData)movedEvent.head()).getSystemProperties().getSequenceNumber();
        if (movedSeqNo == requestSeqNo) {
            future = Future$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> movedEvent, ExecutionContext.Implicits$.MODULE$.global());
            return future;
        }
        PartitionRuntimeInformation info = (PartitionRuntimeInformation)Await$.MODULE$.result(RetryUtils$.MODULE$.retryJava((Function0 & Serializable & scala.Serializable)() -> this.client().getPartitionRuntimeInformation(((Object)BoxesRunTime.boxToInteger((int)$this.nAndP.partitionId())).toString()), "partitionRuntime", RetryUtils$.MODULE$.retryJava$default$3(), RetryUtils$.MODULE$.retryJava$default$4()), (Duration)this.ehConf.internalOperationTimeout());
        if (requestSeqNo < info.getBeginSequenceNumber() && movedSeqNo == info.getBeginSequenceNumber()) {
            future = Future$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> movedEvent, ExecutionContext.Implicits$.MODULE$.global());
            return future;
        }
        String consumerGroup = (String)this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup());
        throw new IllegalStateException(new StringBuilder(41).append("In partition ").append(info.getPartitionId()).append(" of ").append(info.getEventHubPath()).append(", with consumer group ").append(consumerGroup).append(", ").append(new StringBuilder(71).append("request seqNo ").append(requestSeqNo).append(" is less than the received seqNo ").append(receivedSeqNo).append(". The earliest seqNo is ").toString()).append(new StringBuilder(26).append(info.getBeginSequenceNumber()).append(", the last seqNo is ").append(info.getLastEnqueuedSequenceNumber()).append(", and ").toString()).append(new StringBuilder(15).append("received seqNo ").append(movedSeqNo).toString()).toString());
    }

    public Iterator<EventData> org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(long requestSeqNo, int batchSize) {
        long taskId = EventHubsUtils$.MODULE$.getTaskId();
        long startTimeNs = System.nanoTime();
        Iterable first = (Iterable)Await$.MODULE$.result(this.checkCursor(requestSeqNo), (Duration)this.ehConf.internalOperationTimeout());
        long firstSeqNo = ((EventData)first.head()).getSystemProperties().getSequenceNumber();
        int batchCount = (int)(requestSeqNo + (long)batchSize - firstSeqNo);
        if (batchCount <= 0) {
            return scala.package$.MODULE$.Iterator().empty();
        }
        IndexedSeq theRest = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(1), batchCount).map((Function1 & Serializable & scala.Serializable)i -> this.awaitReceiveMessage((Awaitable)this.receiveOne((java.time.Duration)this.ehConf.receiverTimeout().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultReceiverTimeout()), new StringBuilder(18).append("receive; ").append(this.nAndP).append("; seqNo: ").append(requestSeqNo + (long)BoxesRunTime.unboxToInt((Object)i)).toString()), requestSeqNo), IndexedSeq$.MODULE$.canBuildFrom());
        Iterable combined = (Iterable)first.$plus$plus((GenTraversableOnce)theRest.flatten((Function1)Predef$.MODULE$.$conforms()), Iterable$.MODULE$.canBuildFrom());
        Iterator sorted = ((IterableLike)combined.toSeq().sortWith((Function2 & Serializable & scala.Serializable)(e1, e2) -> BoxesRunTime.boxToBoolean((boolean)CachedEventHubsReceiver.$anonfun$receive$3(e1, e2)))).iterator();
        Tuple2 tuple2 = sorted.duplicate();
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        Iterator result = (Iterator)tuple2._1();
        Iterator validate = (Iterator)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)result, (Object)validate);
        Tuple2 tuple23 = tuple22;
        Iterator result2 = (Iterator)tuple23._1();
        Iterator validate2 = (Iterator)tuple23._2();
        long elapsedTimeMs = TimeUnit.NANOSECONDS.toMillis(CachedEventHubsReceiver.elapsedTimeNs$2(startTimeNs));
        if (this.metricPlugin().isDefined()) {
            Tuple2 tuple24 = (Tuple2)validate2.map((Function1 & Serializable & scala.Serializable)eventData -> new Tuple2.mcIJ.sp(1, (long)eventData.getBytes().length)).reduceOption((Function2 & Serializable & scala.Serializable)(countAndSize1, countAndSize2) -> new Tuple2.mcIJ.sp(countAndSize1._1$mcI$sp() + countAndSize2._1$mcI$sp(), countAndSize1._2$mcJ$sp() + countAndSize2._2$mcJ$sp())).getOrElse((Function0 & Serializable & scala.Serializable)() -> new Tuple2.mcIJ.sp(0, 0L));
            if (tuple24 == null) {
                throw new MatchError((Object)tuple24);
            }
            int validateSize = tuple24._1$mcI$sp();
            long batchSizeInBytes = tuple24._2$mcJ$sp();
            Tuple2.mcIJ.sp sp2 = new Tuple2.mcIJ.sp(validateSize, batchSizeInBytes);
            Tuple2.mcIJ.sp sp3 = sp2;
            int validateSize2 = sp3._1$mcI$sp();
            long batchSizeInBytes2 = sp3._2$mcJ$sp();
            this.metricPlugin().foreach((Function1 & Serializable & scala.Serializable)x$4 -> {
                x$4.onReceiveMetric(this.nAndP, batchCount, batchSizeInBytes2, elapsedTimeMs);
                return BoxedUnit.UNIT;
            });
            Predef$.MODULE$.assert(validateSize2 == batchCount);
        } else {
            Predef$.MODULE$.assert(validate2.size() == batchCount);
        }
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(80).append("(TID ").append(taskId).append(") Finished receiving for ").append($this.nAndP).append(", consumer group: ").append($this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())).append(", batchSize: ").append(batchSize).append(", elapsed time: ").append(elapsedTimeMs).append(" ms").toString());
        return result2;
    }

    private <T> T awaitReceiveMessage(Awaitable<T> awaitable, long requestSeqNo) {
        Object object;
        long taskId = EventHubsUtils$.MODULE$.getTaskId();
        try {
            object = Await$.MODULE$.result(awaitable, (Duration)this.ehConf.internalOperationTimeout());
        }
        catch (TimeoutException e) {
            this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(95).append("(TID ").append(taskId).append(") awaitReceiveMessage call failed with timeout. Event Hub ").append($this.nAndP).append(", ConsumerGroup ").append($this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())).append(". requestSeqNo: ").append(requestSeqNo).toString());
            this.recreateReceiver(requestSeqNo);
            throw e;
        }
        return (T)object;
    }

    private final Future receiveOneWithRetry$1(java.time.Duration timeout, String msg, int retryCount) {
        Future future;
        if (!this.receiver().getIsOpen() && retryCount < package$.MODULE$.RetryCount()) {
            long taskId = EventHubsUtils$.MODULE$.getTaskId();
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(67).append("(TID ").append(taskId).append(") receiver is not opened yet. Will retry {").append(retryCount).append("} ").append($this.nAndP).append(", consumer group: ").append($this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())).toString());
            int retry = retryCount + 1;
            future = RetryUtils$.MODULE$.after(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(package$.MODULE$.WaitInterval())).milliseconds(), (Function0 & Serializable & scala.Serializable)() -> this.receiveOneWithRetry$1(timeout, msg, retry), ExecutionContext.Implicits$.MODULE$.global());
        } else {
            this.receiver().setReceiveTimeout(timeout);
            future = RetryUtils$.MODULE$.retryNotNull((Function0 & Serializable & scala.Serializable)() -> this.receiver().receive(1), msg).map((Function1 & Serializable & scala.Serializable)x$1 -> (Iterable)JavaConverters$.MODULE$.iterableAsScalaIterableConverter(x$1).asScala(), ExecutionContext.Implicits$.MODULE$.global());
        }
        return future;
    }

    private static final long elapsedTimeNs$1(long startTimeNs$1) {
        return System.nanoTime() - startTimeNs$1;
    }

    private static final long elapsedTimeNs$2(long startTimeNs$2) {
        return System.nanoTime() - startTimeNs$2;
    }

    public static final /* synthetic */ boolean $anonfun$receive$3(EventData e1, EventData e2) {
        return e1.getSystemProperties().getSequenceNumber() < e2.getSystemProperties().getSequenceNumber();
    }

    public CachedEventHubsReceiver(EventHubsConf ehConf, NameAndPartition nAndP, long startSeqNo) {
        this.ehConf = ehConf;
        this.nAndP = nAndP;
        Logging.$init$((Logging)this);
        this.receiver = this.createReceiver(startSeqNo);
    }
}

