/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.eventhubs;

import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Serializable;
import java.lang.invoke.LambdaMetafactory;
import java.nio.charset.StandardCharsets;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.eventhubs.EventHubsConf;
import org.apache.spark.eventhubs.EventHubsConf$;
import org.apache.spark.eventhubs.NameAndPartition;
import org.apache.spark.eventhubs.client.Client;
import org.apache.spark.eventhubs.package$PartitionPreferredLocationStrategy$;
import org.apache.spark.eventhubs.rdd.EventHubsRDD;
import org.apache.spark.eventhubs.rdd.OffsetRange;
import org.apache.spark.eventhubs.rdd.OffsetRange$;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.eventhubs.EventHubsSource$;
import org.apache.spark.sql.eventhubs.EventHubsSourceOffset;
import org.apache.spark.sql.eventhubs.EventHubsSourceOffset$;
import org.apache.spark.sql.eventhubs.EventHubsSourceProvider$;
import org.apache.spark.sql.execution.streaming.HDFSMetadataLog;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.SerializedOffset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Enumeration;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple5;
import scala.collection.Iterable;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.StringOps;
import scala.collection.immutable.StringOps$;
import scala.collection.mutable.ArrayOps;
import scala.math.Numeric;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\t%a!\u0002\u0014(\u0001-\n\u0004\u0002\u0003%\u0001\u0005\u0003\u0005\u000b\u0011\u0002&\t\u00119\u0003!\u0011!Q\u0001\n=C\u0001b\u0018\u0001\u0003\u0002\u0003\u0006I\u0001\u0018\u0005\u0007A\u0002!\taJ1\t\u0011\u001d\u0004\u0001R1A\u0005\n!D\u0001\u0002\u001d\u0001\t\u0006\u0004%I!\u001d\u0005\bm\u0002\u0011\r\u0011\"\u0003x\u0011\u0019a\b\u0001)A\u0005q\"9Q\u0010\u0001b\u0001\n\u0013q\bBB@\u0001A\u0003%A\fC\u0005\u0002\u0002\u0001\u0011\r\u0011\"\u0003\u0002\u0004!A\u0011Q\u0002\u0001!\u0002\u0013\t)\u0001C\u0005\u0002\u0010\u0001\u0011\r\u0011\"\u0003\u0002\u0012!A\u0011q\u0004\u0001!\u0002\u0013\t\u0019\u0002\u0003\u0006\u0002\"\u0001A)\u0019!C\u0005\u0003GA\u0011\"a\u0013\u0001\u0001\u0004%I!!\u0014\t\u0013\u0005E\u0003\u00011A\u0005\n\u0005M\u0003\u0002CA0\u0001\u0001\u0006K!a\u0014\t\u0013\u0005\u0005\u0004\u00011A\u0005\n\u00055\u0003\"CA2\u0001\u0001\u0007I\u0011BA3\u0011!\tI\u0007\u0001Q!\n\u0005=\u0003bBA6\u0001\u0011\u0005\u0013Q\u000e\u0005\b\u0003w\u0002A\u0011IA?\u0011\u001d\t9\t\u0001C\u0005\u0003\u0013Cq!a$\u0001\t\u0013\t\t\nC\u0004\u0002\"\u0002!\t%a)\t\u000f\u0005u\u0006\u0001\"\u0011\u0002@\"9\u0011\u0011\u0019\u0001\u0005\n\u0005\rw\u0001CAeO!\u0005q%a3\u0007\u000f\u0019:\u0003\u0012A\u0014\u0002N\"1\u0001M\bC\u0001\u0003+D\u0001\"a6\u001f\u0005\u0004%\tA \u0005\b\u00033t\u0002\u0015!\u0003]\u0011%\tYN\bb\u0001\n\u00039\u0013\u000fC\u0004\u0002^z\u0001\u000b\u0011\u0002:\t\u000f\u0005}g\u0004\"\u0001\u0002b\"9\u00111\u001e\u0010\u0005\n\u00055(aD#wK:$\b*\u001e2t'>,(oY3\u000b\u0005!J\u0013!C3wK:$\b.\u001e2t\u0015\tQ3&A\u0002tc2T!\u0001L\u0017\u0002\u000bM\u0004\u0018M]6\u000b\u00059z\u0013AB1qC\u000eDWMC\u00011\u0003\ry'oZ\n\u0005\u0001IR$\t\u0005\u00024q5\tAG\u0003\u00026m\u0005!A.\u00198h\u0015\u00059\u0014\u0001\u00026bm\u0006L!!\u000f\u001b\u0003\r=\u0013'.Z2u!\tY\u0004)D\u0001=\u0015\tid(A\u0005tiJ,\u0017-\\5oO*\u0011q(K\u0001\nKb,7-\u001e;j_:L!!\u0011\u001f\u0003\rM{WO]2f!\t\u0019e)D\u0001E\u0015\t)5&\u0001\u0005j]R,'O\\1m\u0013\t9EIA\u0004M_\u001e<\u0017N\\4\u0002\u0015M\fHnQ8oi\u0016DHo\u0001\u0001\u0011\u0005-cU\"A\u0015\n\u00055K#AC*R\u0019\u000e{g\u000e^3yi\u0006Q\u0001/\u0019:b[\u0016$XM]:\u0011\tAKF\f\u0018\b\u0003#^\u0003\"AU+\u000e\u0003MS!\u0001V%\u0002\rq\u0012xn\u001c;?\u0015\u00051\u0016!B:dC2\f\u0017B\u0001-V\u0003\u0019\u0001&/\u001a3fM&\u0011!l\u0017\u0002\u0004\u001b\u0006\u0004(B\u0001-V!\t\u0001V,\u0003\u0002_7\n11\u000b\u001e:j]\u001e\fA\"\\3uC\u0012\fG/\u0019)bi\"\fa\u0001P5oSRtD\u0003\u00022eK\u001a\u0004\"a\u0019\u0001\u000e\u0003\u001dBQ\u0001\u0013\u0003A\u0002)CQA\u0014\u0003A\u0002=CQa\u0018\u0003A\u0002q\u000b\u0001\"\u001a5DY&,g\u000e^\u000b\u0002SB\u0011!N\\\u0007\u0002W*\u0011A.\\\u0001\u0007G2LWM\u001c;\u000b\u0005!Z\u0013BA8l\u0005\u0019\u0019E.[3oi\u0006q\u0001/\u0019:uSRLwN\\\"pk:$X#\u0001:\u0011\u0005M$X\"A+\n\u0005U,&aA%oi\u00061Q\r[\"p]\u001a,\u0012\u0001\u001f\t\u0003sjl\u0011!\\\u0005\u0003w6\u0014Q\"\u0012<f]RDUOY:D_:4\u0017aB3i\u0007>tg\rI\u0001\u0007K\"t\u0015-\\3\u0016\u0003q\u000bq!\u001a5OC6,\u0007%\u0001\u0002tGV\u0011\u0011Q\u0001\t\u0005\u0003\u000f\tI!D\u0001,\u0013\r\tYa\u000b\u0002\r'B\f'o[\"p]R,\u0007\u0010^\u0001\u0004g\u000e\u0004\u0013\u0001F7bq>3gm]3ugB+'\u000f\u0016:jO\u001e,'/\u0006\u0002\u0002\u0014A)1/!\u0006\u0002\u001a%\u0019\u0011qC+\u0003\r=\u0003H/[8o!\r\u0019\u00181D\u0005\u0004\u0003;)&\u0001\u0002'p]\u001e\fQ#\\1y\u001f\u001a47/\u001a;t!\u0016\u0014HK]5hO\u0016\u0014\b%\u0001\fj]&$\u0018.\u00197QCJ$\u0018\u000e^5p]N+\u0017OT8t+\t\t)\u0003\u0005\u0004Q3\u0006\u001d\u0012Q\u0006\t\u0004s\u0006%\u0012bAA\u0016[\n\u0001b*Y7f\u0003:$\u0007+\u0019:uSRLwN\u001c\t\u0005\u0003_\t)E\u0004\u0003\u00022\u0005\u0005c\u0002BA\u001a\u0003\u007fqA!!\u000e\u0002>9!\u0011qGA\u001e\u001d\r\u0011\u0016\u0011H\u0005\u0002a%\u0011afL\u0005\u0003Y5J!\u0001K\u0016\n\u0007\u0005\rS.A\u0004qC\u000e\\\u0017mZ3\n\t\u0005\u001d\u0013\u0011\n\u0002\u000f'\u0016\fX/\u001a8dK:+XNY3s\u0015\r\t\u0019%\\\u0001\u000eGV\u0014(/\u001a8u'\u0016\fhj\\:\u0016\u0005\u0005=\u0003#B:\u0002\u0016\u0005\u0015\u0012!E2veJ,g\u000e^*fc:{7o\u0018\u0013fcR!\u0011QKA.!\r\u0019\u0018qK\u0005\u0004\u00033*&\u0001B+oSRD\u0011\"!\u0018\u0012\u0003\u0003\u0005\r!a\u0014\u0002\u0007a$\u0013'\u0001\bdkJ\u0014XM\u001c;TKFtun\u001d\u0011\u0002\u001d\u0015\f'\u000f\\5fgR\u001cV-\u001d(pg\u0006\u0011R-\u0019:mS\u0016\u001cHoU3r\u001d>\u001cx\fJ3r)\u0011\t)&a\u001a\t\u0013\u0005uC#!AA\u0002\u0005=\u0013aD3be2LWm\u001d;TKFtun\u001d\u0011\u0002\rM\u001c\u0007.Z7b+\t\ty\u0007\u0005\u0003\u0002r\u0005]TBAA:\u0015\r\t)(K\u0001\u0006if\u0004Xm]\u0005\u0005\u0003s\n\u0019H\u0001\u0006TiJ,8\r\u001e+za\u0016\f\u0011bZ3u\u001f\u001a47/\u001a;\u0016\u0005\u0005}\u0004#B:\u0002\u0016\u0005\u0005\u0005cA\u001e\u0002\u0004&\u0019\u0011Q\u0011\u001f\u0003\r=3gm]3u\u0003Q\tGM[;tiN#\u0018M\u001d;j]\u001e|eMZ:fiR!\u0011QEAF\u0011\u001d\ti\t\u0007a\u0001\u0003K\tAA\u001a:p[\u0006I!/\u0019;f\u0019&l\u0017\u000e\u001e\u000b\u000b\u0003K\t\u0019*a&\u0002\u001a\u0006u\u0005bBAK3\u0001\u0007\u0011\u0011D\u0001\u0006Y&l\u0017\u000e\u001e\u0005\b\u0003\u001bK\u0002\u0019AA\u0013\u0011\u001d\tY*\u0007a\u0001\u0003K\tQ!\u001e8uS2Dq!a(\u001a\u0001\u0004\t)#A\u0004ge>lg*Z<\u0002\u0011\u001d,GOQ1uG\"$b!!*\u00026\u0006e\u0006\u0003BAT\u0003_sA!!+\u0002.:!\u00111GAV\u0013\tQ3&C\u0002\u0002D%JA!!-\u00024\nIA)\u0019;b\rJ\fW.\u001a\u0006\u0004\u0003\u0007J\u0003bBA\\5\u0001\u0007\u0011qP\u0001\u0006gR\f'\u000f\u001e\u0005\b\u0003wS\u0002\u0019AAA\u0003\r)g\u000eZ\u0001\u0005gR|\u0007\u000f\u0006\u0002\u0002V\u0005q!/\u001a9peR$\u0015\r^1M_N\u001cH\u0003BA+\u0003\u000bDa!a2\u001d\u0001\u0004a\u0016aB7fgN\fw-Z\u0001\u0010\u000bZ,g\u000e\u001e%vEN\u001cv.\u001e:dKB\u00111MH\n\u0004=\u0005=\u0007cA:\u0002R&\u0019\u00111[+\u0003\r\u0005s\u0017PU3g)\t\tY-\u0001\u0011J]N$(/^2uS>t7OR8s!>$XM\u001c;jC2$\u0015\r^1M_N\u001c\u0018!I%ogR\u0014Xo\u0019;j_:\u001chi\u001c:Q_R,g\u000e^5bY\u0012\u000bG/\u0019'pgN\u0004\u0013a\u0002,F%NKuJT\u0001\t-\u0016\u00136+S(OA\u0005)r-\u001a;T_J$X\rZ#yK\u000e,Ho\u001c:MSN$H\u0003BAr\u0003S\u0004Ba]As9&\u0019\u0011q]+\u0003\u000b\u0005\u0013(/Y=\t\u000f\u0005\u0005A\u00051\u0001\u0002\u0006\u000591m\\7qCJ,GCBAx\u0003k\u0014)\u0001E\u0002t\u0003cL1!a=V\u0005\u001d\u0011un\u001c7fC:Dq!a>&\u0001\u0004\tI0A\u0001b!\u0011\tYP!\u0001\u000e\u0005\u0005u(bAA\u0000W\u0005I1o\u00195fIVdWM]\u0005\u0005\u0005\u0007\tiPA\rFq\u0016\u001cW\u000f^8s\u0007\u0006\u001c\u0007.\u001a+bg.dunY1uS>t\u0007b\u0002B\u0004K\u0001\u0007\u0011\u0011`\u0001\u0002E\u0002")
public class EventHubsSource
implements Source,
Logging {
    private Client ehClient;
    private int partitionCount;
    private Map<NameAndPartition, Object> initialPartitionSeqNos;
    public final SQLContext org$apache$spark$sql$eventhubs$EventHubsSource$$sqlContext;
    private final Map<String, String> parameters;
    public final String org$apache$spark$sql$eventhubs$EventHubsSource$$metadataPath;
    private final EventHubsConf ehConf;
    private final String ehName;
    private final SparkContext sc;
    private final Option<Object> maxOffsetsPerTrigger;
    private Option<Map<NameAndPartition, Object>> currentSeqNos;
    private Option<Map<NameAndPartition, Object>> earliestSeqNos;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;

    public static String[] getSortedExecutorList(SparkContext sparkContext) {
        return EventHubsSource$.MODULE$.getSortedExecutorList(sparkContext);
    }

    public static String InstructionsForPotentialDataLoss() {
        return EventHubsSource$.MODULE$.InstructionsForPotentialDataLoss();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void commit(Offset end) {
        Source.commit$((Source)this, (Offset)end);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private Client ehClient$lzycompute() {
        EventHubsSource eventHubsSource = this;
        synchronized (eventHubsSource) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.ehClient = (Client)EventHubsSourceProvider$.MODULE$.clientFactory(this.parameters).apply((Object)this.ehConf());
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        this.parameters = null;
        return this.ehClient;
    }

    private Client ehClient() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.ehClient$lzycompute() : this.ehClient;
    }

    private int partitionCount$lzycompute() {
        EventHubsSource eventHubsSource = this;
        synchronized (eventHubsSource) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.partitionCount = this.ehClient().partitionCount();
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.partitionCount;
    }

    private int partitionCount() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.partitionCount$lzycompute() : this.partitionCount;
    }

    private EventHubsConf ehConf() {
        return this.ehConf;
    }

    private String ehName() {
        return this.ehName;
    }

    private SparkContext sc() {
        return this.sc;
    }

    private Option<Object> maxOffsetsPerTrigger() {
        return this.maxOffsetsPerTrigger;
    }

    private Map<NameAndPartition, Object> initialPartitionSeqNos$lzycompute() {
        EventHubsSource eventHubsSource = this;
        synchronized (eventHubsSource) {
            if ((byte)(this.bitmap$0 & 4) == 0) {
                HDFSMetadataLog<EventHubsSourceOffset> metadataLog = new HDFSMetadataLog<EventHubsSourceOffset>(this){

                    public void serialize(EventHubsSourceOffset metadata, OutputStream out) {
                        out.write(0);
                        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
                        writer.write(new StringBuilder(2).append("v").append(EventHubsSource$.MODULE$.VERSION()).append("\n").toString());
                        writer.write(metadata.json());
                        writer.flush();
                    }

                    /*
                     * Enabled force condition propagation
                     * Lifted jumps to return sites
                     */
                    public EventHubsSourceOffset deserialize(InputStream in) {
                        EventHubsSourceOffset eventHubsSourceOffset;
                        in.read();
                        String content = IOUtils.toString((Reader)new InputStreamReader(in, StandardCharsets.UTF_8));
                        Predef$.MODULE$.assert(content.length() != 0);
                        if (StringOps$.MODULE$.apply$extension(Predef$.MODULE$.augmentString(content), 0) == 'v') {
                            int indexOfNewLine = content.indexOf("\n");
                            if (indexOfNewLine <= 0) throw new IllegalStateException("Log file was malformed.");
                            int version = this.parseVersion(content.substring(0, indexOfNewLine), EventHubsSource$.MODULE$.VERSION());
                            eventHubsSourceOffset = EventHubsSourceOffset$.MODULE$.apply(new SerializedOffset(content.substring(indexOfNewLine + 1)));
                            return eventHubsSourceOffset;
                        } else {
                            eventHubsSourceOffset = EventHubsSourceOffset$.MODULE$.apply(new SerializedOffset(content));
                        }
                        return eventHubsSourceOffset;
                    }
                };
                this.initialPartitionSeqNos = ((EventHubsSourceOffset)((Object)metadataLog.get(0L).getOrElse(() -> EventHubsSource.$anonfun$initialPartitionSeqNos$1(this, (HDFSMetadataLog)metadataLog)))).partitionToSeqNos();
                this.bitmap$0 = (byte)(this.bitmap$0 | 4);
            }
        }
        return this.initialPartitionSeqNos;
    }

    private Map<NameAndPartition, Object> initialPartitionSeqNos() {
        return (byte)(this.bitmap$0 & 4) == 0 ? this.initialPartitionSeqNos$lzycompute() : this.initialPartitionSeqNos;
    }

    private Option<Map<NameAndPartition, Object>> currentSeqNos() {
        return this.currentSeqNos;
    }

    private void currentSeqNos_$eq(Option<Map<NameAndPartition, Object>> x$1) {
        this.currentSeqNos = x$1;
    }

    private Option<Map<NameAndPartition, Object>> earliestSeqNos() {
        return this.earliestSeqNos;
    }

    private void earliestSeqNos_$eq(Option<Map<NameAndPartition, Object>> x$1) {
        this.earliestSeqNos = x$1;
    }

    public StructType schema() {
        return EventHubsSourceProvider$.MODULE$.eventHubsSchema();
    }

    /*
     * Unable to fully structure code
     */
    public Option<Offset> getOffset() {
        block5: {
            block4: {
                this.initialPartitionSeqNos();
                earliestAndLatest = this.ehClient().allBoundedSeqNos();
                this.earliestSeqNos_$eq((Option<Map<NameAndPartition, Object>>)new Some((Object)((Map)earliestAndLatest.map((Function1)(Function1 & Serializable & scala.Serializable)LambdaMetafactory.altMetafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, $anonfun$getOffset$1(org.apache.spark.sql.eventhubs.EventHubsSource scala.Tuple2 ), (Lscala/Tuple2;)Lscala/Tuple2;)((EventHubsSource)this), Map$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())));
                latest = ((Map)earliestAndLatest.map((Function1)(Function1 & Serializable & scala.Serializable)LambdaMetafactory.altMetafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, $anonfun$getOffset$2(org.apache.spark.sql.eventhubs.EventHubsSource scala.Tuple2 ), (Lscala/Tuple2;)Lscala/Tuple2;)((EventHubsSource)this), Map$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
                var5_3 = false;
                var6_4 = null;
                var7_5 = this.maxOffsetsPerTrigger();
                if (!None$.MODULE$.equals(var7_5)) break block4;
                var1_6 = latest;
                break block5;
            }
            if (!(var7_5 instanceof Some)) ** GOTO lbl-1000
            var5_3 = true;
            var6_4 = (Some)var7_5;
            limit = BoxesRunTime.unboxToLong((Object)var6_4.value());
            if (this.currentSeqNos().isEmpty()) {
                startingSeqNos = this.adjustStartingOffset(this.initialPartitionSeqNos());
                var1_6 = this.rateLimit(limit, startingSeqNos, latest, (Map<NameAndPartition, Object>)((Map)this.earliestSeqNos().get()));
            } else if (var5_3) {
                limit = BoxesRunTime.unboxToLong((Object)var6_4.value());
                startingSeqNos = this.adjustStartingOffset((Map<NameAndPartition, Object>)((Map)this.currentSeqNos().get()));
                var1_6 = this.rateLimit(limit, startingSeqNos, latest, (Map<NameAndPartition, Object>)((Map)this.earliestSeqNos().get()));
            } else {
                throw new MatchError(var7_5);
            }
        }
        seqNos = var1_6;
        this.currentSeqNos_$eq((Option<Map<NameAndPartition, Object>>)new Some((Object)seqNos));
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)LambdaMetafactory.altMetafactory(null, null, null, ()Ljava/lang/Object;, $anonfun$getOffset$3(scala.collection.immutable.Map ), ()Ljava/lang/String;)((Map)seqNos));
        return new Some((Object)new EventHubsSourceOffset(seqNos));
    }

    private Map<NameAndPartition, Object> adjustStartingOffset(Map<NameAndPartition, Object> from) {
        return (Map)from.map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2;
            Tuple2 tuple22 = x0$1;
            if (tuple22 != null) {
                NameAndPartition nAndP = (NameAndPartition)tuple22._1();
                long seqNo = tuple22._2$mcJ$sp();
                if (seqNo < BoxesRunTime.unboxToLong((Object)((MapLike)this.earliestSeqNos().get()).apply((Object)nAndP))) {
                    this.reportDataLoss(new StringBuilder(112).append("Starting seqNo ").append(seqNo).append(" in partition ").append(nAndP.partitionId()).append(" of EventHub ").append(nAndP.ehName()).append(" ").append(new StringBuilder(40).append("is behind the earliest sequence number ").append(((MapLike)this.earliestSeqNos().get()).apply((Object)nAndP)).append(" ").toString()).append("present in the service. Some events may have expired and been missed.").toString());
                    tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)nAndP), ((MapLike)this.earliestSeqNos().get()).apply((Object)nAndP));
                } else {
                    tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)nAndP), (Object)BoxesRunTime.boxToLong((long)seqNo));
                }
            } else {
                throw new MatchError((Object)tuple22);
            }
            Tuple2 tuple23 = tuple2;
            return tuple23;
        }, Map$.MODULE$.canBuildFrom());
    }

    private Map<NameAndPartition, Object> rateLimit(long limit, Map<NameAndPartition, Object> from, Map<NameAndPartition, Object> until, Map<NameAndPartition, Object> fromNew) {
        Map sizes = (Map)until.flatMap((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            NameAndPartition nameAndPartition = (NameAndPartition)tuple2._1();
            long end = tuple2._2$mcJ$sp();
            Iterable iterable = Option$.MODULE$.option2Iterable(from.get((Object)nameAndPartition).orElse((Function0 & Serializable & scala.Serializable)() -> fromNew.get((Object)nameAndPartition)).flatMap((Function1 & Serializable & scala.Serializable)begin -> EventHubsSource.$anonfun$rateLimit$3(this, end, nameAndPartition, BoxesRunTime.unboxToLong((Object)begin))));
            return iterable;
        }, Map$.MODULE$.canBuildFrom());
        double total = BoxesRunTime.unboxToLong((Object)sizes.values().sum((Numeric)Numeric.LongIsIntegral$.MODULE$));
        return total < 1.0 ? until : (Map)until.map((Function1 & Serializable & scala.Serializable)x0$2 -> {
            Tuple2 tuple2 = x0$2;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            NameAndPartition nameAndPartition = (NameAndPartition)tuple2._1();
            long end = tuple2._2$mcJ$sp();
            Tuple2 tuple22 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)nameAndPartition), sizes.get((Object)nameAndPartition).map((Function1)(JFunction1.mcJJ.sp & Serializable & scala.Serializable)size -> {
                long begin = BoxesRunTime.unboxToLong((Object)from.getOrElse((Object)nameAndPartition, (Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> BoxesRunTime.unboxToLong((Object)fromNew.apply((Object)nameAndPartition))));
                double prorate = (double)limit * ((double)size / total);
                this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(30).append("rateLimit ").append(nameAndPartition).append(" prorated amount is ").append(prorate).toString());
                long off = begin + (long)(prorate < 1.0 ? Math.ceil(prorate) : Math.floor(prorate));
                this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(25).append("rateLimit ").append(nameAndPartition).append(" new offset is ").append(off).toString());
                return Math.min(end, off);
            }).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> end));
            return tuple22;
        }, Map$.MODULE$.canBuildFrom());
    }

    public Dataset<Row> getBatch(Option<Offset> start, Offset end) {
        Map<NameAndPartition, Object> map;
        Option<Offset> option;
        this.initialPartitionSeqNos();
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(40).append("getBatch called with start = ").append(start).append(" and end = ").append(end).toString());
        Map<NameAndPartition, Object> untilSeqNos = EventHubsSourceOffset$.MODULE$.getPartitionSeqNos(end);
        if (this.currentSeqNos().isEmpty()) {
            this.currentSeqNos_$eq((Option<Map<NameAndPartition, Object>>)new Some(untilSeqNos));
        }
        if (start.isDefined()) {
            Object object = start.get();
            Offset offset = end;
            if (!(object != null ? !object.equals(offset) : offset != null)) {
                return this.org$apache$spark$sql$eventhubs$EventHubsSource$$sqlContext.internalCreateDataFrame(this.org$apache$spark$sql$eventhubs$EventHubsSource$$sqlContext.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(InternalRow.class)), this.schema(), true);
            }
        }
        if (this.earliestSeqNos().isEmpty()) {
            Map<Object, Tuple2<Object, Object>> earliestAndLatest = this.ehClient().allBoundedSeqNos();
            this.earliestSeqNos_$eq((Option<Map<NameAndPartition, Object>>)new Some((Object)((Map)earliestAndLatest.map((Function1 & Serializable & scala.Serializable)x0$1 -> {
                Tuple2 tuple2;
                int p;
                block3: {
                    Tuple2 tuple22;
                    block2: {
                        tuple22 = x0$1;
                        if (tuple22 == null) break block2;
                        p = tuple22._1$mcI$sp();
                        tuple2 = (Tuple2)tuple22._2();
                        if (tuple2 != null) break block3;
                    }
                    throw new MatchError((Object)tuple22);
                }
                long e = tuple2._1$mcJ$sp();
                Tuple2 tuple23 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new NameAndPartition(this.ehName(), p)), (Object)BoxesRunTime.boxToLong((long)e));
                return tuple23;
            }, Map$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())));
        }
        if ((option = start) instanceof Some) {
            Some some = (Some)option;
            Offset prevBatchEndOffset = (Offset)some.value();
            Map<NameAndPartition, Object> startingSeqNos = EventHubsSourceOffset$.MODULE$.getPartitionSeqNos(prevBatchEndOffset);
            map = this.adjustStartingOffset(startingSeqNos);
        } else if (None$.MODULE$.equals(option)) {
            map = this.adjustStartingOffset(this.initialPartitionSeqNos());
        } else {
            throw new MatchError(option);
        }
        Map<NameAndPartition, Object> fromSeqNos = map;
        Seq nameAndPartitions = untilSeqNos.keySet().toSeq();
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(12).append("Partitions: ").append(nameAndPartitions.mkString(", ")).toString());
        String[] sortedExecutors = EventHubsSource$.MODULE$.getSortedExecutorList(this.sc());
        int numExecutors = sortedExecutors.length;
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("Sorted executors: ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])sortedExecutors)).mkString(", ")).toString());
        OffsetRange[] offsetRanges = (OffsetRange[])((TraversableOnce)((TraversableLike)((TraversableLike)nameAndPartitions.map((Function1 & Serializable & scala.Serializable)np -> {
            int n;
            long fromSeqNo = BoxesRunTime.unboxToLong((Object)fromSeqNos.getOrElse(np, (Function0 & Serializable & scala.Serializable)() -> {
                throw new IllegalStateException(new StringBuilder(25).append(np).append(" doesn't have a fromSeqNo").toString());
            }));
            long untilSeqNo = BoxesRunTime.unboxToLong((Object)untilSeqNos.apply(np));
            Enumeration.Value value = this.ehConf().partitionPreferredLocationStrategy();
            Enumeration.Value value2 = package$PartitionPreferredLocationStrategy$.MODULE$.Hash();
            Enumeration.Value value3 = value;
            if (!(value2 != null ? !value2.equals(value3) : value3 != null)) {
                n = np.hashCode();
            } else {
                Enumeration.Value value4 = package$PartitionPreferredLocationStrategy$.MODULE$.BalancedHash();
                Enumeration.Value value5 = value;
                if (!(value4 != null ? !value4.equals(value5) : value5 != null)) {
                    n = np.ehName().hashCode() + np.partitionId();
                } else {
                    throw new IllegalArgumentException(new StringBuilder(32).append("Unsupported partition strategy: ").append(this.ehConf().partitionPreferredLocationStrategy()).toString());
                }
            }
            int preferredPartitionLocation = n;
            None$ preferredLoc = numExecutors > 0 ? new Some((Object)sortedExecutors[Math.floorMod(preferredPartitionLocation, numExecutors)]) : None$.MODULE$;
            return new Tuple5(np, (Object)BoxesRunTime.boxToLong((long)fromSeqNo), (Object)BoxesRunTime.boxToLong((long)untilSeqNo), (Object)BoxesRunTime.boxToInteger((int)preferredPartitionLocation), (Object)preferredLoc);
        }, Seq$.MODULE$.canBuildFrom())).map((Function1 & Serializable & scala.Serializable)x$3 -> {
            Tuple5 tuple5 = x$3;
            if (tuple5 == null) {
                throw new MatchError((Object)tuple5);
            }
            NameAndPartition np = (NameAndPartition)tuple5._1();
            long fromSeqNo = BoxesRunTime.unboxToLong((Object)tuple5._2());
            long untilSeqNo = BoxesRunTime.unboxToLong((Object)tuple5._3());
            Option preferredLoc = (Option)tuple5._5();
            OffsetRange offsetRange = OffsetRange$.MODULE$.apply(np, fromSeqNo, untilSeqNo, (Option<String>)preferredLoc);
            return offsetRange;
        }, Seq$.MODULE$.canBuildFrom())).filter((Function1 & Serializable & scala.Serializable)range -> BoxesRunTime.boxToBoolean((boolean)EventHubsSource.$anonfun$getBatch$8(this, range)))).toArray(ClassTag$.MODULE$.apply(OffsetRange.class));
        RDD<InternalRow> rdd = EventHubsSourceProvider$.MODULE$.toInternalRow(new EventHubsRDD(this.sc(), this.ehConf().trimmed(), offsetRanges));
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(41).append("GetBatch generating RDD of offset range: ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])offsetRanges)).sortBy((Function1 & Serializable & scala.Serializable)x$4 -> x$4.nameAndPartition().toString(), (Ordering)Ordering.String$.MODULE$))).mkString(", ")).toString());
        return this.org$apache$spark$sql$eventhubs$EventHubsSource$$sqlContext.internalCreateDataFrame(rdd, this.schema(), true);
    }

    public synchronized void stop() {
        this.ehClient().close();
    }

    private void reportDataLoss(String message) {
        this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(0).append(message).append(new StringBuilder(2).append(". ").append(EventHubsSource$.MODULE$.InstructionsForPotentialDataLoss()).toString()).toString());
    }

    public static final /* synthetic */ long $anonfun$maxOffsetsPerTrigger$1(String x$1) {
        return new StringOps(Predef$.MODULE$.augmentString(x$1)).toLong();
    }

    /*
     * WARNING - void declaration
     */
    public static final /* synthetic */ EventHubsSourceOffset $anonfun$initialPartitionSeqNos$1(EventHubsSource $this, HDFSMetadataLog metadataLog$1) {
        void var3_3;
        Map seqNos = (Map)$this.ehClient().translate($this.ehConf(), $this.partitionCount(), $this.ehClient().translate$default$3()).map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            int pId = tuple2._1$mcI$sp();
            long seqNo = tuple2._2$mcJ$sp();
            Tuple2 tuple22 = new Tuple2((Object)new NameAndPartition($this.ehName(), pId), (Object)BoxesRunTime.boxToLong((long)seqNo));
            return tuple22;
        }, Map$.MODULE$.canBuildFrom());
        EventHubsSourceOffset offset = new EventHubsSourceOffset((Map<NameAndPartition, Object>)seqNos);
        metadataLog$1.add(0L, (Object)offset);
        $this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(26).append("Initial sequence numbers: ").append(seqNos).toString());
        return var3_3;
    }

    public static final /* synthetic */ Tuple2 $anonfun$getOffset$1(EventHubsSource $this, Tuple2 x0$1) {
        Tuple2 tuple2;
        int p;
        block3: {
            Tuple2 tuple22;
            block2: {
                tuple22 = x0$1;
                if (tuple22 == null) break block2;
                p = tuple22._1$mcI$sp();
                tuple2 = (Tuple2)tuple22._2();
                if (tuple2 != null) break block3;
            }
            throw new MatchError((Object)tuple22);
        }
        long e = tuple2._1$mcJ$sp();
        Tuple2 tuple23 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new NameAndPartition($this.ehName(), p)), (Object)BoxesRunTime.boxToLong((long)e));
        return tuple23;
    }

    public static final /* synthetic */ Tuple2 $anonfun$getOffset$2(EventHubsSource $this, Tuple2 x0$2) {
        Tuple2 tuple2;
        int p;
        block3: {
            Tuple2 tuple22;
            block2: {
                tuple22 = x0$2;
                if (tuple22 == null) break block2;
                p = tuple22._1$mcI$sp();
                tuple2 = (Tuple2)tuple22._2();
                if (tuple2 != null) break block3;
            }
            throw new MatchError((Object)tuple22);
        }
        long l = tuple2._2$mcJ$sp();
        Tuple2 tuple23 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new NameAndPartition($this.ehName(), p)), (Object)BoxesRunTime.boxToLong((long)l));
        return tuple23;
    }

    public static final /* synthetic */ String $anonfun$getOffset$3(Map seqNos$2) {
        return new StringBuilder(11).append("GetOffset: ").append(((SeqLike)seqNos$2.toSeq().map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.toString(), Seq$.MODULE$.canBuildFrom())).sorted((Ordering)Ordering.String$.MODULE$)).toString();
    }

    public static final /* synthetic */ Option $anonfun$rateLimit$3(EventHubsSource $this, long end$1, NameAndPartition nameAndPartition$1, long begin) {
        long size = end$1 - begin;
        $this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(19).append("rateLimit ").append(nameAndPartition$1).append(" size is ").append(size).toString());
        return size > 0L ? new Some((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)nameAndPartition$1), (Object)BoxesRunTime.boxToLong((long)size))) : None$.MODULE$;
    }

    public static final /* synthetic */ boolean $anonfun$getBatch$8(EventHubsSource $this, OffsetRange range) {
        boolean bl;
        if (range.untilSeqNo() < range.fromSeqNo()) {
            $this.reportDataLoss(new StringBuilder(46).append("Partition ").append(range.nameAndPartition()).append("'s sequence number was changed from ").append(new StringBuilder(36).append(range.fromSeqNo()).append(" to ").append(range.untilSeqNo()).append(", some data may have been missed").toString()).toString());
            bl = false;
        } else {
            bl = true;
        }
        return bl;
    }

    public EventHubsSource(SQLContext sqlContext, Map<String, String> parameters, String metadataPath) {
        this.org$apache$spark$sql$eventhubs$EventHubsSource$$sqlContext = sqlContext;
        this.parameters = parameters;
        this.org$apache$spark$sql$eventhubs$EventHubsSource$$metadataPath = metadataPath;
        Source.$init$((Source)this);
        Logging.$init$((Logging)this);
        this.ehConf = EventHubsConf$.MODULE$.toConf(parameters);
        this.ehName = this.ehConf().name();
        this.sc = sqlContext.sparkContext();
        this.maxOffsetsPerTrigger = Option$.MODULE$.apply(parameters.get((Object)EventHubsConf$.MODULE$.MaxEventsPerTriggerKey()).map((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToLong((long)EventHubsSource.$anonfun$maxOffsetsPerTrigger$1(x$1))).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> this.partitionCount() * 1000));
        this.currentSeqNos = None$.MODULE$;
        this.earliestSeqNos = None$.MODULE$;
    }
}

