/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.eventhubs.client;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
import com.microsoft.azure.eventhubs.ReceiverOptions;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.spark.SparkEnv$;
import org.apache.spark.eventhubs.EventHubsConf;
import org.apache.spark.eventhubs.EventHubsUtils$;
import org.apache.spark.eventhubs.EventPosition$;
import org.apache.spark.eventhubs.NameAndPartition;
import org.apache.spark.eventhubs.PartitionPerformanceMetric;
import org.apache.spark.eventhubs.PartitionPerformanceMetric$;
import org.apache.spark.eventhubs.client.CachedEventHubsReceiver$;
import org.apache.spark.eventhubs.client.ClientConnectionPool$;
import org.apache.spark.eventhubs.package$;
import org.apache.spark.eventhubs.utils.MetricPlugin;
import org.apache.spark.eventhubs.utils.RetryUtils$;
import org.apache.spark.internal.Logging;
import org.apache.spark.rpc.RpcEndpointRef;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.package;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\t\u0015c!\u0002\u0011\"\u0001\u0005Z\u0003\u0002\u0003\u001d\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001e\t\u0011y\u0002!\u0011!Q\u0001\n}B\u0001B\u0011\u0001\u0003\u0002\u0003\u0006Ia\u0011\u0005\u0006+\u0002!IAV\u0003\u00059\u0002\u0001Q\f\u0003\u0005h\u0001!\u0015\r\u0011\"\u0003i\u0011!\u0011\u0003\u0001#b\u0001\n\u0013\u0011\bb\u0002@\u0001\u0001\u0004%Ia \u0005\n\u0003\u000f\u0001\u0001\u0019!C\u0005\u0003\u0013A\u0001\"!\u0006\u0001A\u0003&\u0011\u0011\u0001\u0005\b\u0003/\u0001A\u0011BA\r\u0011\u001d\ty\u0002\u0001C\u0005\u0003CAq!a\r\u0001\t\u0013\t)\u0004C\u0004\u0002t\u0001!I!!\u001e\t\u000f\u0005\u0015\u0005\u0001\"\u0003\u0002\b\"9\u00111\u0012\u0001\u0005\n\u00055\u0005bBAJ\u0001\u0011%\u0011Q\u0013\u0005\b\u0003S\u0003A\u0011BAV\u0011\u001d\t\t\u000e\u0001C\u0005\u0003'<\u0001\"a8\"\u0011\u0003)\u0013\u0011\u001d\u0004\bA\u0005B\t!JAr\u0011\u0019)V\u0003\"\u0001\u0002l\"I\u0011Q^\u000bC\u0002\u0013%\u0011q\u001e\u0005\t\u0003c,\u0002\u0015!\u0003\u0002.\u00151\u00111_\u000b\u0001\u0003kD\u0001Ba\u0005\u0016A\u0003%!Q\u0003\u0005\n\u0005/)\"\u0019!C\u0001\u00053A\u0001Ba\n\u0016A\u0003%!1\u0004\u0005\b\u0005S)B\u0011\u0002B\u0016\u0011!\t\u0019*\u0006C!G\tE\u0002b\u0002B\u001e+\u0011\u0005!Q\b\u0002\u0018\u0007\u0006\u001c\u0007.\u001a3Fm\u0016tG\u000fS;cgJ+7-Z5wKJT!AI\u0012\u0002\r\rd\u0017.\u001a8u\u0015\t!S%A\u0005fm\u0016tG\u000f[;cg*\u0011aeJ\u0001\u0006gB\f'o\u001b\u0006\u0003Q%\na!\u00199bG\",'\"\u0001\u0016\u0002\u0007=\u0014xmE\u0002\u0001YI\u0002\"!\f\u0019\u000e\u00039R\u0011aL\u0001\u0006g\u000e\fG.Y\u0005\u0003c9\u0012a!\u00118z%\u00164\u0007CA\u001a7\u001b\u0005!$BA\u001b&\u0003!Ig\u000e^3s]\u0006d\u0017BA\u001c5\u0005\u001daunZ4j]\u001e\fa!\u001a5D_:47\u0001\u0001\t\u0003wqj\u0011aI\u0005\u0003{\r\u0012Q\"\u0012<f]RDUOY:D_:4\u0017!\u00028B]\u0012\u0004\u0006CA\u001eA\u0013\t\t5E\u0001\tOC6,\u0017I\u001c3QCJ$\u0018\u000e^5p]\u0006Q1\u000f^1siN+\u0017OT8\u0011\u0005\u0011\u0013fBA#Q\u001d\t1uJ\u0004\u0002H\u001d:\u0011\u0001*\u0014\b\u0003\u00132k\u0011A\u0013\u0006\u0003\u0017f\na\u0001\u0010:p_Rt\u0014\"\u0001\u0016\n\u0005!J\u0013B\u0001\u0014(\u0013\t!S%\u0003\u0002RG\u00059\u0001/Y2lC\u001e,\u0017BA*U\u00059\u0019V-];f]\u000e,g*^7cKJT!!U\u0012\u0002\rqJg.\u001b;?)\u00119\u0016LW.\u0011\u0005a\u0003Q\"A\u0011\t\u000ba\"\u0001\u0019\u0001\u001e\t\u000by\"\u0001\u0019A \t\u000b\t#\u0001\u0019A\"\u0003+\u0005;\u0018-\u001b;US6,w.\u001e;Fq\u000e,\u0007\u000f^5p]B\u0011a,Z\u0007\u0002?*\u0011\u0001-Y\u0001\u000bG>t7-\u001e:sK:$(B\u00012d\u0003\u0011)H/\u001b7\u000b\u0003\u0011\fAA[1wC&\u0011am\u0018\u0002\u0011)&lWm\\;u\u000bb\u001cW\r\u001d;j_:\fA\"\\3ue&\u001c\u0007\u000b\\;hS:,\u0012!\u001b\t\u0004[)d\u0017BA6/\u0005\u0019y\u0005\u000f^5p]B\u0011Q\u000e]\u0007\u0002]*\u0011qnI\u0001\u0006kRLGn]\u0005\u0003c:\u0014A\"T3ue&\u001c\u0007\u000b\\;hS:,\u0012a\u001d\t\u0003irl\u0011!\u001e\u0006\u0003IYT!a\u001e=\u0002\u000b\u0005TXO]3\u000b\u0005eT\u0018!C7jGJ|7o\u001c4u\u0015\u0005Y\u0018aA2p[&\u0011Q0\u001e\u0002\u000f\u000bZ,g\u000e\u001e%vE\u000ec\u0017.\u001a8u\u0003!\u0011XmY3jm\u0016\u0014XCAA\u0001!\r!\u00181A\u0005\u0004\u0003\u000b)(!\u0005)beRLG/[8o%\u0016\u001cW-\u001b<fe\u0006a!/Z2fSZ,'o\u0018\u0013fcR!\u00111BA\t!\ri\u0013QB\u0005\u0004\u0003\u001fq#\u0001B+oSRD\u0011\"a\u0005\n\u0003\u0003\u0005\r!!\u0001\u0002\u0007a$\u0013'A\u0005sK\u000e,\u0017N^3sA\u0005q1M]3bi\u0016\u0014VmY3jm\u0016\u0014H\u0003BA\u0001\u00037Aa!!\b\f\u0001\u0004\u0019\u0015!B:fc:{\u0017A\u00057bgR\u0014VmY3jm\u0016$wJ\u001a4tKR$\"!a\t\u0011\r\u0005\u0015\u0012\u0011FA\u0017\u001b\t\t9C\u0003\u0002a]%!\u00111FA\u0014\u0005\u00191U\u000f^;sKB\u0019Q&a\f\n\u0007\u0005EbF\u0001\u0003M_:<\u0017A\u0003:fG\u0016Lg/Z(oKR1\u0011qGA(\u0003?\u0002b!!\n\u0002*\u0005e\u0002CBA\u001e\u0003\u0007\nIE\u0004\u0003\u0002>\u0005\u0005cbA%\u0002@%\tq&\u0003\u0002R]%!\u0011QIA$\u0005!IE/\u001a:bE2,'BA)/!\r!\u00181J\u0005\u0004\u0003\u001b*(!C#wK:$H)\u0019;b\u0011\u001d\t\t&\u0004a\u0001\u0003'\nq\u0001^5nK>,H\u000f\u0005\u0003\u0002V\u0005mSBAA,\u0015\r\tIfY\u0001\u0005i&lW-\u0003\u0003\u0002^\u0005]#\u0001\u0003#ve\u0006$\u0018n\u001c8\t\u000f\u0005\u0005T\u00021\u0001\u0002d\u0005\u0019Qn]4\u0011\t\u0005\u0015\u0014Q\u000e\b\u0005\u0003O\nI\u0007\u0005\u0002J]%\u0019\u00111\u000e\u0018\u0002\rA\u0013X\rZ3g\u0013\u0011\ty'!\u001d\u0003\rM#(/\u001b8h\u0015\r\tYGL\u0001\u000eG2|7/\u001a*fG\u0016Lg/\u001a:\u0015\u0005\u0005]\u0004CBA\u0013\u0003S\tI\b\u0005\u0003\u0002|\u0005\u0005UBAA?\u0015\r\tyhY\u0001\u0005Y\u0006tw-\u0003\u0003\u0002\u0004\u0006u$\u0001\u0002,pS\u0012\f\u0001C]3de\u0016\fG/\u001a*fG\u0016Lg/\u001a:\u0015\t\u0005-\u0011\u0011\u0012\u0005\u0007\u0003;y\u0001\u0019A\"\u0002\u0017\rDWmY6DkJ\u001cxN\u001d\u000b\u0005\u0003o\ty\t\u0003\u0004\u0002\u0012B\u0001\raQ\u0001\re\u0016\fX/Z:u'\u0016\fhj\\\u0001\be\u0016\u001cW-\u001b<f)\u0019\t9*!(\u0002 B1\u00111HAM\u0003\u0013JA!a'\u0002H\tA\u0011\n^3sCR|'\u000f\u0003\u0004\u0002\u0012F\u0001\ra\u0011\u0005\b\u0003C\u000b\u0002\u0019AAR\u0003%\u0011\u0017\r^2i'&TX\rE\u0002.\u0003KK1!a*/\u0005\rIe\u000e^\u0001\u0014C^\f\u0017\u000e\u001e*fG\u0016Lg/Z'fgN\fw-Z\u000b\u0005\u0003[\u000b\u0019\f\u0006\u0004\u00020\u0006\u0015\u0017q\u001a\t\u0005\u0003c\u000b\u0019\f\u0004\u0001\u0005\u000f\u0005U&C1\u0001\u00028\n\tA+\u0005\u0003\u0002:\u0006}\u0006cA\u0017\u0002<&\u0019\u0011Q\u0018\u0018\u0003\u000f9{G\u000f[5oOB\u0019Q&!1\n\u0007\u0005\rgFA\u0002B]fDq!a2\u0013\u0001\u0004\tI-A\u0005bo\u0006LG/\u00192mKB1\u0011QEAf\u0003_KA!!4\u0002(\tI\u0011i^1ji\u0006\u0014G.\u001a\u0005\u0007\u0003#\u0013\u0002\u0019A\"\u0002AM,g\u000e\u001a)beRLG/[8o!\u0016\u0014hm\u001c:nC:\u001cW\rV8Ee&4XM\u001d\u000b\u0005\u0003\u0017\t)\u000eC\u0004\u0002XN\u0001\r!!7\u0002)A\f'\u000f^5uS>t\u0007+\u001a:g_Jl\u0017M\\2f!\rY\u00141\\\u0005\u0004\u0003;\u001c#A\u0007)beRLG/[8o!\u0016\u0014hm\u001c:nC:\u001cW-T3ue&\u001c\u0017aF\"bG\",G-\u0012<f]RDUOY:SK\u000e,\u0017N^3s!\tAVcE\u0003\u0016Y\u0005\u0015(\u0007E\u0002Y\u0003OL1!!;\"\u00059\u0019\u0015m\u00195fIJ+7-Z5wKJ$\"!!9\u0002'M$\u0018M\u001d;SK\u000eLWM^3s)&lWMT:\u0016\u0005\u00055\u0012\u0001F:uCJ$(+Z2jKZ,'\u000fV5nK:\u001b\bE\u0001\u0006NkR\f'\r\\3NCB,b!a>\u0003\n\t=\u0001\u0003CA}\u0005\u0007\u00119A!\u0004\u000e\u0005\u0005m(\u0002BA\u007f\u0003\u007f\fq!\\;uC\ndWMC\u0002\u0003\u00029\n!bY8mY\u0016\u001cG/[8o\u0013\u0011\u0011)!a?\u0003\u000f!\u000b7\u000f['baB!\u0011\u0011\u0017B\u0005\t\u001d\u0011Y!\u0007b\u0001\u0003o\u0013\u0011!\u0011\t\u0005\u0003c\u0013y\u0001B\u0004\u0003\u0012e\u0011\r!a.\u0003\u0003\t\u000b\u0011B]3dK&4XM]:\u0011\u000f\u0005e(1AA2/\u0006y\u0002/\u0019:uSRLwN\u001c)fe\u001a|'/\\1oG\u0016\u0014VmY3jm\u0016\u0014(+\u001a4\u0016\u0005\tm\u0001\u0003\u0002B\u000f\u0005Gi!Aa\b\u000b\u0007\t\u0005R%A\u0002sa\u000eLAA!\n\u0003 \tq!\u000b]2F]\u0012\u0004x.\u001b8u%\u00164\u0017\u0001\t9beRLG/[8o!\u0016\u0014hm\u001c:nC:\u001cWMU3dK&4XM\u001d*fM\u0002\n1a[3z)\u0019\t\u0019G!\f\u00030!)\u0001(\ba\u0001u!)a(\ba\u0001\u007fQQ\u0011q\u0013B\u001a\u0005k\u00119D!\u000f\t\u000bar\u0002\u0019\u0001\u001e\t\u000byr\u0002\u0019A \t\r\u0005Ee\u00041\u0001D\u0011\u001d\t\tK\ba\u0001\u0003G\u000bQ!\u00199qYf$ra\u0016B \u0005\u0003\u0012\u0019\u0005C\u00039?\u0001\u0007!\bC\u0003??\u0001\u0007q\bC\u0003C?\u0001\u00071\t")
public class CachedEventHubsReceiver
implements Logging {
    private Option<MetricPlugin> metricPlugin;
    private EventHubClient client;
    private final EventHubsConf ehConf;
    private final NameAndPartition nAndP;
    private PartitionReceiver receiver;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;

    public static CachedEventHubsReceiver apply(EventHubsConf eventHubsConf, NameAndPartition nameAndPartition, long l) {
        return CachedEventHubsReceiver$.MODULE$.apply(eventHubsConf, nameAndPartition, l);
    }

    public static RpcEndpointRef partitionPerformanceReceiverRef() {
        return CachedEventHubsReceiver$.MODULE$.partitionPerformanceReceiverRef();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private Option<MetricPlugin> metricPlugin$lzycompute() {
        CachedEventHubsReceiver cachedEventHubsReceiver = this;
        synchronized (cachedEventHubsReceiver) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.metricPlugin = this.ehConf.metricPlugin();
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.metricPlugin;
    }

    private Option<MetricPlugin> metricPlugin() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.metricPlugin$lzycompute() : this.metricPlugin;
    }

    private EventHubClient client$lzycompute() {
        CachedEventHubsReceiver cachedEventHubsReceiver = this;
        synchronized (cachedEventHubsReceiver) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.client = ClientConnectionPool$.MODULE$.borrowClient(this.ehConf);
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.client;
    }

    private EventHubClient client() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.client$lzycompute() : this.client;
    }

    private PartitionReceiver receiver() {
        return this.receiver;
    }

    private void receiver_$eq(PartitionReceiver x$1) {
        this.receiver = x$1;
    }

    private PartitionReceiver createReceiver(long seqNo) {
        long taskId = EventHubsUtils$.MODULE$.getTaskId();
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(62).append("(TID ").append(taskId).append(") creating receiver for Event Hub ").append($this.nAndP.ehName()).append(" on partition ").append($this.nAndP.partitionId()).append(". seqNo: ").append(seqNo).toString());
        String consumerGroup = (String)this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup());
        ReceiverOptions receiverOptions = new ReceiverOptions();
        receiverOptions.setReceiverRuntimeMetricEnabled(true);
        receiverOptions.setPrefetchCount(BoxesRunTime.unboxToInt((Object)this.ehConf.prefetchCount().getOrElse((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultPrefetchCount())));
        receiverOptions.setIdentifier(new StringBuilder(7).append("spark-").append(SparkEnv$.MODULE$.get().executorId()).append("-").append(taskId).toString());
        Function0 & Serializable & scala.Serializable intersect = (Function0 & Serializable & scala.Serializable)() -> EventHubsUtils$.MODULE$.createReceiverInner(this.client(), $this.ehConf.useExclusiveReceiver(), consumerGroup, ((Object)BoxesRunTime.boxToInteger((int)$this.nAndP.partitionId())).toString(), EventPosition$.MODULE$.fromSequenceNumber(seqNo).convert(), receiverOptions);
        int n = RetryUtils$.MODULE$.retryJava$default$3();
        int n2 = RetryUtils$.MODULE$.retryJava$default$4();
        RetryUtils$.MODULE$.retryJava$default$5();
        Future consumer = RetryUtils$.MODULE$.retryJava(intersect, "CachedReceiver creation.", n, n2, null);
        return (PartitionReceiver)Await$.MODULE$.result(consumer, (Duration)this.ehConf.internalOperationTimeout());
    }

    private Future<Object> lastReceivedOffset() {
        return this.receiver().getEventPosition().getSequenceNumber() != null ? Future$.MODULE$.successful((Object)BoxesRunTime.boxToLong((long)Predef$.MODULE$.Long2long(this.receiver().getEventPosition().getSequenceNumber()))) : Future$.MODULE$.successful((Object)BoxesRunTime.boxToLong((long)-1L));
    }

    private Future<Iterable<EventData>> receiveOne(java.time.Duration timeout, String msg) {
        return this.receiveOneWithRetry$1(timeout, msg, 0);
    }

    private Future<Void> closeReceiver() {
        Promise dummyResult = Promise$.MODULE$.apply();
        dummyResult.success(null);
        Function0 & Serializable & scala.Serializable x$1 = (Function0 & Serializable & scala.Serializable)() -> this.receiver().close();
        String x$2 = "closing a receiver";
        Future x$3 = dummyResult.future();
        int x$4 = RetryUtils$.MODULE$.retryJava$default$3();
        int x$5 = RetryUtils$.MODULE$.retryJava$default$4();
        return RetryUtils$.MODULE$.retryJava(x$1, x$2, x$4, x$5, x$3);
    }

    private void recreateReceiver(long seqNo) {
        long taskId = EventHubsUtils$.MODULE$.getTaskId();
        long startTimeNs = System.nanoTime();
        Object object = !this.ehConf.useExclusiveReceiver() ? Await$.MODULE$.result(this.closeReceiver(), (Duration)this.ehConf.internalOperationTimeout()) : BoxedUnit.UNIT;
        this.receiver_$eq(this.createReceiver(seqNo));
        long elapsedTimeMs = TimeUnit.NANOSECONDS.toMillis(CachedEventHubsReceiver.elapsedTimeNs$1(startTimeNs));
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(49).append("(TID ").append(taskId).append(") Finished recreating a receiver for ").append($this.nAndP).append(", ").append($this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())).append(": ").append(elapsedTimeMs).append(" ms").toString());
    }

    /*
     * Enabled aggressive block sorting
     */
    private Future<Iterable<EventData>> checkCursor(long requestSeqNo) {
        Future future;
        Iterable<EventData> event;
        long receivedSeqNo;
        long taskId = EventHubsUtils$.MODULE$.getTaskId();
        long lastReceivedSeqNo = BoxesRunTime.unboxToLong((Object)Await$.MODULE$.result(this.lastReceivedOffset(), (Duration)this.ehConf.internalOperationTimeout()));
        if (lastReceivedSeqNo > -1L && lastReceivedSeqNo + 1L != requestSeqNo || !this.receiver().getIsOpen()) {
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(95).append("(TID ").append(taskId).append(") checkCursor. Recreating a receiver for ").append($this.nAndP).append(", ").append($this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())).append(". requestSeqNo: ").append(requestSeqNo).append(", lastReceivedSeqNo: ").append(lastReceivedSeqNo).append(", isOpen: ").append(this.receiver().getIsOpen()).toString());
            this.recreateReceiver(requestSeqNo);
        }
        if ((receivedSeqNo = ((EventData)(event = this.awaitReceiveMessage((Awaitable)this.receiveOne((java.time.Duration)this.ehConf.receiverTimeout().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultReceiverTimeout()), "checkCursor initial"), requestSeqNo)).head()).getSystemProperties().getSequenceNumber()) == requestSeqNo) {
            future = Future$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> event, ExecutionContext.Implicits$.MODULE$.global());
            return future;
        }
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(81).append("(TID ").append(taskId).append(") checkCursor. Recreating a receiver for ").append($this.nAndP).append(", ").append($this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())).append(". requestSeqNo: ").append(requestSeqNo).append(", receivedSeqNo: ").append(receivedSeqNo).toString());
        this.recreateReceiver(requestSeqNo);
        Iterable<EventData> movedEvent = this.awaitReceiveMessage((Awaitable)this.receiveOne((java.time.Duration)this.ehConf.receiverTimeout().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultReceiverTimeout()), "checkCursor move"), requestSeqNo);
        long movedSeqNo = ((EventData)movedEvent.head()).getSystemProperties().getSequenceNumber();
        if (movedSeqNo == requestSeqNo) {
            future = Future$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> movedEvent, ExecutionContext.Implicits$.MODULE$.global());
            return future;
        }
        Function0 & Serializable & scala.Serializable intersect = (Function0 & Serializable & scala.Serializable)() -> this.client().getPartitionRuntimeInformation(((Object)BoxesRunTime.boxToInteger((int)$this.nAndP.partitionId())).toString());
        int n = RetryUtils$.MODULE$.retryJava$default$3();
        int n2 = RetryUtils$.MODULE$.retryJava$default$4();
        RetryUtils$.MODULE$.retryJava$default$5();
        PartitionRuntimeInformation info = (PartitionRuntimeInformation)Await$.MODULE$.result(RetryUtils$.MODULE$.retryJava(intersect, "partitionRuntime", n, n2, null), (Duration)this.ehConf.internalOperationTimeout());
        if (requestSeqNo < info.getBeginSequenceNumber() && movedSeqNo == info.getBeginSequenceNumber()) {
            future = Future$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> movedEvent, ExecutionContext.Implicits$.MODULE$.global());
            return future;
        }
        String consumerGroup = (String)this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup());
        throw new IllegalStateException(new StringBuilder(41).append("In partition ").append(info.getPartitionId()).append(" of ").append(info.getEventHubPath()).append(", with consumer group ").append(consumerGroup).append(", ").append(new StringBuilder(71).append("request seqNo ").append(requestSeqNo).append(" is less than the received seqNo ").append(receivedSeqNo).append(". The earliest seqNo is ").toString()).append(new StringBuilder(26).append(info.getBeginSequenceNumber()).append(", the last seqNo is ").append(info.getLastEnqueuedSequenceNumber()).append(", and ").toString()).append(new StringBuilder(15).append("received seqNo ").append(movedSeqNo).toString()).toString());
    }

    public Iterator<EventData> org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(long requestSeqNo, int batchSize) {
        long taskId = EventHubsUtils$.MODULE$.getTaskId();
        long startTimeNs = System.nanoTime();
        Iterable first = (Iterable)Await$.MODULE$.result(this.checkCursor(requestSeqNo), (Duration)this.ehConf.internalOperationTimeout());
        long firstSeqNo = ((EventData)first.head()).getSystemProperties().getSequenceNumber();
        int batchCount = (int)(requestSeqNo + (long)batchSize - firstSeqNo);
        if (batchCount <= 0) {
            return scala.package$.MODULE$.Iterator().empty();
        }
        IndexedSeq theRest = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(1), batchCount).map((Function1 & Serializable & scala.Serializable)i -> this.awaitReceiveMessage((Awaitable)this.receiveOne((java.time.Duration)this.ehConf.receiverTimeout().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultReceiverTimeout()), new StringBuilder(18).append("receive; ").append(this.nAndP).append("; seqNo: ").append(requestSeqNo + (long)BoxesRunTime.unboxToInt((Object)i)).toString()), requestSeqNo), IndexedSeq$.MODULE$.canBuildFrom());
        Iterable combined = (Iterable)first.$plus$plus((GenTraversableOnce)theRest.flatten((Function1)Predef$.MODULE$.$conforms()), Iterable$.MODULE$.canBuildFrom());
        Iterator sorted = ((IterableLike)combined.toSeq().sortWith((Function2 & Serializable & scala.Serializable)(e1, e2) -> BoxesRunTime.boxToBoolean((boolean)CachedEventHubsReceiver.$anonfun$receive$3(e1, e2)))).iterator();
        Tuple2 tuple2 = sorted.duplicate();
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        Iterator result = (Iterator)tuple2._1();
        Iterator validate = (Iterator)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)result, (Object)validate);
        Tuple2 tuple23 = tuple22;
        Iterator result2 = (Iterator)tuple23._1();
        Iterator validate2 = (Iterator)tuple23._2();
        long elapsedTimeMs = TimeUnit.NANOSECONDS.toMillis(CachedEventHubsReceiver.elapsedTimeNs$2(startTimeNs));
        if (this.ehConf.slowPartitionAdjustment()) {
            this.sendPartitionPerformanceToDriver(new PartitionPerformanceMetric(this.nAndP, EventHubsUtils$.MODULE$.getTaskContextSlim(), requestSeqNo, batchCount, elapsedTimeMs));
        }
        if (this.metricPlugin().isDefined()) {
            Tuple2 tuple24 = (Tuple2)validate2.map((Function1 & Serializable & scala.Serializable)eventData -> new Tuple2.mcIJ.sp(1, (long)eventData.getBytes().length)).reduceOption((Function2 & Serializable & scala.Serializable)(countAndSize1, countAndSize2) -> new Tuple2.mcIJ.sp(countAndSize1._1$mcI$sp() + countAndSize2._1$mcI$sp(), countAndSize1._2$mcJ$sp() + countAndSize2._2$mcJ$sp())).getOrElse((Function0 & Serializable & scala.Serializable)() -> new Tuple2.mcIJ.sp(0, 0L));
            if (tuple24 == null) {
                throw new MatchError((Object)tuple24);
            }
            int validateSize = tuple24._1$mcI$sp();
            long batchSizeInBytes = tuple24._2$mcJ$sp();
            Tuple2.mcIJ.sp sp2 = new Tuple2.mcIJ.sp(validateSize, batchSizeInBytes);
            Tuple2.mcIJ.sp sp3 = sp2;
            int validateSize2 = sp3._1$mcI$sp();
            long batchSizeInBytes2 = sp3._2$mcJ$sp();
            this.metricPlugin().foreach((Function1 & Serializable & scala.Serializable)x$4 -> {
                x$4.onReceiveMetric(EventHubsUtils$.MODULE$.getTaskContextSlim(), this.nAndP, batchCount, batchSizeInBytes2, elapsedTimeMs);
                return BoxedUnit.UNIT;
            });
            Predef$.MODULE$.assert(validateSize2 == batchCount);
        } else {
            Predef$.MODULE$.assert(validate2.size() == batchCount);
        }
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(80).append("(TID ").append(taskId).append(") Finished receiving for ").append($this.nAndP).append(", consumer group: ").append($this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())).append(", batchSize: ").append(batchSize).append(", elapsed time: ").append(elapsedTimeMs).append(" ms").toString());
        return result2;
    }

    private <T> T awaitReceiveMessage(Awaitable<T> awaitable, long requestSeqNo) {
        Object object;
        long taskId = EventHubsUtils$.MODULE$.getTaskId();
        try {
            object = Await$.MODULE$.result(awaitable, (Duration)this.ehConf.internalOperationTimeout());
        }
        catch (TimeoutException e) {
            this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(95).append("(TID ").append(taskId).append(") awaitReceiveMessage call failed with timeout. Event Hub ").append($this.nAndP).append(", ConsumerGroup ").append($this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())).append(". requestSeqNo: ").append(requestSeqNo).toString());
            this.recreateReceiver(requestSeqNo);
            throw e;
        }
        return (T)object;
    }

    private void sendPartitionPerformanceToDriver(PartitionPerformanceMetric partitionPerformance) {
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(43).append("(Task: ").append(EventHubsUtils$.MODULE$.getTaskContextSlim()).append(") sends PartitionPerformanceMetric: ").append(new StringBuilder(15).append((Object)PartitionPerformanceMetric$.MODULE$).append(" to the driver.").toString()).toString());
        try {
            CachedEventHubsReceiver$.MODULE$.partitionPerformanceReceiverRef().send((Object)partitionPerformance);
        }
        catch (Exception e) {
            this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(51).append("(Task: ").append(EventHubsUtils$.MODULE$.getTaskContextSlim()).append(") failed to send the RPC message containing ").append(new StringBuilder(43).append("PartitionPerformanceMetric: ").append((Object)PartitionPerformanceMetric$.MODULE$).append(" to the driver.").toString()).toString());
        }
    }

    private final Future receiveOneWithRetry$1(java.time.Duration timeout, String msg, int retryCount) {
        Future future;
        if (!this.receiver().getIsOpen() && retryCount < package$.MODULE$.RetryCount()) {
            long taskId = EventHubsUtils$.MODULE$.getTaskId();
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(67).append("(TID ").append(taskId).append(") receiver is not opened yet. Will retry {").append(retryCount).append("} ").append($this.nAndP).append(", consumer group: ").append($this.ehConf.consumerGroup().getOrElse((Function0 & Serializable & scala.Serializable)() -> package$.MODULE$.DefaultConsumerGroup())).toString());
            int retry = retryCount + 1;
            future = RetryUtils$.MODULE$.after(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(package$.MODULE$.WaitInterval())).milliseconds(), (Function0 & Serializable & scala.Serializable)() -> this.receiveOneWithRetry$1(timeout, msg, retry), ExecutionContext.Implicits$.MODULE$.global());
        } else {
            this.receiver().setReceiveTimeout(timeout);
            future = RetryUtils$.MODULE$.retryNotNull((Function0 & Serializable & scala.Serializable)() -> this.receiver().receive(1), msg).map((Function1 & Serializable & scala.Serializable)x$1 -> (Iterable)JavaConverters$.MODULE$.iterableAsScalaIterableConverter(x$1).asScala(), ExecutionContext.Implicits$.MODULE$.global());
        }
        return future;
    }

    private static final long elapsedTimeNs$1(long startTimeNs$1) {
        return System.nanoTime() - startTimeNs$1;
    }

    private static final long elapsedTimeNs$2(long startTimeNs$2) {
        return System.nanoTime() - startTimeNs$2;
    }

    public static final /* synthetic */ boolean $anonfun$receive$3(EventData e1, EventData e2) {
        return e1.getSystemProperties().getSequenceNumber() < e2.getSystemProperties().getSequenceNumber();
    }

    public CachedEventHubsReceiver(EventHubsConf ehConf, NameAndPartition nAndP, long startSeqNo) {
        this.ehConf = ehConf;
        this.nAndP = nAndP;
        Logging.$init$((Logging)this);
        this.receiver = this.createReceiver(startSeqNo);
    }
}

