/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.kafka011;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.kafka011.ConsumerStrategy;
import org.apache.spark.sql.kafka011.KafkaOffsetRangeLimit$;
import org.apache.spark.sql.kafka011.KafkaSourceOffset;
import org.apache.spark.util.ThreadUtils$;
import org.apache.spark.util.UninterruptibleThread;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SetLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.Set$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.ExecutionContextExecutorService;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.Duration$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.java8.JFunction0;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0001\u0005uh!B\u0001\u0003\u0001\ta!!E&bM.\fwJ\u001a4tKR\u0014V-\u00193fe*\u00111\u0001B\u0001\tW\u000647.\u0019\u00192c)\u0011QAB\u0001\u0004gFd'BA\u0004\t\u0003\u0015\u0019\b/\u0019:l\u0015\tI!\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0017\u0005\u0019qN]4\u0014\u0007\u0001i1\u0003\u0005\u0002\u000f#5\tqBC\u0001\u0011\u0003\u0015\u00198-\u00197b\u0013\t\u0011rB\u0001\u0004B]f\u0014VM\u001a\t\u0003)]i\u0011!\u0006\u0006\u0003-\u0019\t\u0001\"\u001b8uKJt\u0017\r\\\u0005\u00031U\u0011q\u0001T8hO&tw\r\u0003\u0005\u001b\u0001\t\u0005\t\u0015!\u0003\u001d\u0003A\u0019wN\\:v[\u0016\u00148\u000b\u001e:bi\u0016<\u0017p\u0001\u0001\u0011\u0005uqR\"\u0001\u0002\n\u0005}\u0011!\u0001E\"p]N,X.\u001a:TiJ\fG/Z4z\u0011!\t\u0003A!A!\u0002\u0013\u0011\u0013!\u00053sSZ,'oS1gW\u0006\u0004\u0016M]1ngB!1\u0005\u000b\u00166\u001b\u0005!#BA\u0013'\u0003\u0011)H/\u001b7\u000b\u0003\u001d\nAA[1wC&\u0011\u0011\u0006\n\u0002\u0004\u001b\u0006\u0004\bCA\u00163\u001d\ta\u0003\u0007\u0005\u0002.\u001f5\taF\u0003\u000207\u00051AH]8pizJ!!M\b\u0002\rA\u0013X\rZ3g\u0013\t\u0019DG\u0001\u0004TiJLgn\u001a\u0006\u0003c=\u0001\"AN\u001d\u000e\u0003]R!\u0001\u000f\u0014\u0002\t1\fgnZ\u0005\u0003u]\u0012aa\u00142kK\u000e$\b\u0002\u0003\u001f\u0001\u0005\u0003\u0005\u000b\u0011B\u001f\u0002\u001bI,\u0017\rZ3s\u001fB$\u0018n\u001c8t!\u0011YcH\u000b\u0016\n\u0005%\"\u0004\u0002\u0003!\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0016\u0002'\u0011\u0014\u0018N^3s\u000fJ|W\u000f]%e!J,g-\u001b=\t\u000b\t\u0003A\u0011A\"\u0002\rqJg.\u001b;?)\u0015!UIR$I!\ti\u0002\u0001C\u0003\u001b\u0003\u0002\u0007A\u0004C\u0003\"\u0003\u0002\u0007!\u0005C\u0003=\u0003\u0002\u0007Q\bC\u0003A\u0003\u0002\u0007!\u0006C\u0004K\u0001\t\u0007I\u0011A&\u0002#-\fgm[1SK\u0006$WM\u001d+ie\u0016\fG-F\u0001M!\ti\u0005+D\u0001O\u0015\tyE%\u0001\u0006d_:\u001cWO\u001d:f]RL!!\u0015(\u0003\u001f\u0015CXmY;u_J\u001cVM\u001d<jG\u0016Daa\u0015\u0001!\u0002\u0013a\u0015AE6bM.\f'+Z1eKJ$\u0006N]3bI\u0002Bq!\u0016\u0001C\u0002\u0013\u0005a+A\u0006fq\u0016\u001c7i\u001c8uKb$X#A,\u0011\u0005aSV\"A-\u000b\u0005={\u0011BA.Z\u0005})\u00050Z2vi&|gnQ8oi\u0016DH/\u0012=fGV$xN]*feZL7-\u001a\u0005\u0007;\u0002\u0001\u000b\u0011B,\u0002\u0019\u0015DXmY\"p]R,\u0007\u0010\u001e\u0011\t\u000f}\u0003\u0001\u0019!C\u0005A\u00069qM]8va&#W#\u0001\u0016\t\u000f\t\u0004\u0001\u0019!C\u0005G\u0006YqM]8va&#w\fJ3r)\t!w\r\u0005\u0002\u000fK&\u0011am\u0004\u0002\u0005+:LG\u000fC\u0004iC\u0006\u0005\t\u0019\u0001\u0016\u0002\u0007a$\u0013\u0007\u0003\u0004k\u0001\u0001\u0006KAK\u0001\tOJ|W\u000f]%eA!9A\u000e\u0001a\u0001\n\u0013i\u0017A\u00028fqRLE-F\u0001o!\tqq.\u0003\u0002q\u001f\t\u0019\u0011J\u001c;\t\u000fI\u0004\u0001\u0019!C\u0005g\u0006Qa.\u001a=u\u0013\u0012|F%Z9\u0015\u0005\u0011$\bb\u00025r\u0003\u0003\u0005\rA\u001c\u0005\u0007m\u0002\u0001\u000b\u0015\u00028\u0002\u000f9,\u0007\u0010^%eA!9\u0001\u0010\u0001a\u0001\n#I\u0018!C0d_:\u001cX/\\3s+\u0005Q\bcB>\u0002\u0006\u0005%\u0011\u0011B\u0007\u0002y*\u0011QP`\u0001\tG>t7/^7fe*\u0019q0!\u0001\u0002\u000f\rd\u0017.\u001a8ug*\u0019\u00111\u0001\u0005\u0002\u000b-\fgm[1\n\u0007\u0005\u001dAP\u0001\u0005D_:\u001cX/\\3s!\u0015q\u00111BA\b\u0013\r\tia\u0004\u0002\u0006\u0003J\u0014\u0018-\u001f\t\u0004\u001d\u0005E\u0011bAA\n\u001f\t!!)\u001f;f\u0011%\t9\u0002\u0001a\u0001\n#\tI\"A\u0007`G>t7/^7fe~#S-\u001d\u000b\u0004I\u0006m\u0001\u0002\u00035\u0002\u0016\u0005\u0005\t\u0019\u0001>\t\u000f\u0005}\u0001\u0001)Q\u0005u\u0006QqlY8ogVlWM\u001d\u0011)\t\u0005u\u00111\u0005\t\u0004\u001d\u0005\u0015\u0012bAA\u0014\u001f\tAao\u001c7bi&dW\rC\u0003~\u0001\u0011E\u0011\u0010\u0003\u0005\u0002.\u0001\u0011\r\u0011\"\u0003n\u0003Yi\u0017\r_(gMN,GOR3uG\"\fE\u000f^3naR\u001c\bbBA\u0019\u0001\u0001\u0006IA\\\u0001\u0018[\u0006DxJ\u001a4tKR4U\r^2i\u0003R$X-\u001c9ug\u0002B\u0011\"!\u000e\u0001\u0005\u0004%I!a\u000e\u00029=4gm]3u\r\u0016$8\r[!ui\u0016l\u0007\u000f^%oi\u0016\u0014h/\u00197NgV\u0011\u0011\u0011\b\t\u0004\u001d\u0005m\u0012bAA\u001f\u001f\t!Aj\u001c8h\u0011!\t\t\u0005\u0001Q\u0001\n\u0005e\u0012!H8gMN,GOR3uG\"\fE\u000f^3naRLe\u000e^3sm\u0006dWj\u001d\u0011\t\u000f\u0005\u0015\u0003\u0001\"\u0003\u0002H\u0005Ya.\u001a=u\u000fJ|W\u000f]%e)\u0005Q\u0003bBA&\u0001\u0011\u0005\u0013qI\u0001\ti>\u001cFO]5oO\"9\u0011q\n\u0001\u0005\u0002\u0005E\u0013!B2m_N,G#\u00013\t\u000f\u0005U\u0003\u0001\"\u0001\u0002X\u0005!b-\u001a;dQR{\u0007/[2QCJ$\u0018\u000e^5p]N$\"!!\u0017\u0011\u000b-\nY&a\u0018\n\u0007\u0005uCGA\u0002TKR\u0004B!!\u0019\u0002h5\u0011\u00111\r\u0006\u0005\u0003K\n\t!\u0001\u0004d_6lwN\\\u0005\u0005\u0003S\n\u0019G\u0001\bU_BL7\rU1si&$\u0018n\u001c8\t\u000f\u00055\u0004\u0001\"\u0001\u0002p\u0005!b-\u001a;dQN\u0003XmY5gS\u000e|eMZ:fiN$b!!\u001d\u0002x\u0005u\u0004cA\u000f\u0002t%\u0019\u0011Q\u000f\u0002\u0003#-\u000bgm[1T_V\u00148-Z(gMN,G\u000f\u0003\u0005\u0002z\u0005-\u0004\u0019AA>\u0003A\u0001\u0018M\u001d;ji&|gn\u00144gg\u0016$8\u000f\u0005\u0004,}\u0005}\u0013\u0011\b\u0005\t\u0003\u007f\nY\u00071\u0001\u0002\u0002\u0006q!/\u001a9peR$\u0015\r^1M_N\u001c\b#\u0002\b\u0002\u0004*\"\u0017bAAC\u001f\tIa)\u001e8di&|g.\r\u0005\b\u0003\u0013\u0003A\u0011AAF\u0003Q1W\r^2i\u000b\u0006\u0014H.[3ti>3gm]3ugR\u0011\u00111\u0010\u0005\b\u0003\u001f\u0003A\u0011AAI\u0003I1W\r^2i\u0019\u0006$Xm\u001d;PM\u001a\u001cX\r^:\u0015\t\u0005M\u0015\u0011\u0015\t\u0005\u0003+\u000bYJD\u0002\u001e\u0003/K1!!'\u0003\u0003\u001d\u0001\u0018mY6bO\u0016LA!!(\u0002 \n\u0011\u0002+\u0019:uSRLwN\\(gMN,G/T1q\u0015\r\tIJ\u0001\u0005\t\u0003G\u000bi\t1\u0001\u0002&\u0006a1N\\8x]>3gm]3ugB)a\"a*\u0002\u0014&\u0019\u0011\u0011V\b\u0003\r=\u0003H/[8o\u0011\u001d\tI\t\u0001C\u0001\u0003[#B!a\u001f\u00020\"A\u0011\u0011WAV\u0001\u0004\t\u0019,A\u0007oK^\u0004\u0016M\u001d;ji&|gn\u001d\t\u0007\u0003k\u000bi,a\u0018\u000f\t\u0005]\u00161\u0018\b\u0004[\u0005e\u0016\"\u0001\t\n\u0007\u0005eu\"\u0003\u0003\u0002@\u0006\u0005'aA*fc*\u0019\u0011\u0011T\b\t\u000f\u0005\u0015\u0007\u0001\"\u0003\u0002H\u0006\u0011\"/\u001e8V]&tG/\u001a:skB$\u0018N\u00197z+\u0011\tI-a4\u0015\t\u0005-\u0017\u0011\u001d\t\u0005\u0003\u001b\fy\r\u0004\u0001\u0005\u0011\u0005E\u00171\u0019b\u0001\u0003'\u0014\u0011\u0001V\t\u0005\u0003+\fY\u000eE\u0002\u000f\u0003/L1!!7\u0010\u0005\u001dqu\u000e\u001e5j]\u001e\u00042ADAo\u0013\r\tyn\u0004\u0002\u0004\u0003:L\b\"CAr\u0003\u0007$\t\u0019AAs\u0003\u0011\u0011w\u000eZ=\u0011\u000b9\t9/a3\n\u0007\u0005%xB\u0001\u0005=Eft\u0017-\\3?\u0011\u001d\ti\u000f\u0001C\u0005\u0003_\f1d^5uQJ+GO]5fg^KG\u000f[8vi&sG/\u001a:skB$H\u0003BA>\u0003cD\u0011\"a9\u0002l\u0012\u0005\r!a=\u0011\u000b9\t9/a\u001f\t\u000f\u0005]\b\u0001\"\u0003\u0002R\u0005a1\u000f^8q\u0007>t7/^7fe\"9\u00111 \u0001\u0005\n\u0005E\u0013!\u0004:fg\u0016$8i\u001c8tk6,'\u000f")
public class KafkaOffsetReader
implements Logging {
    private final ConsumerStrategy consumerStrategy;
    private final Map<String, Object> driverKafkaParams;
    private final String driverGroupIdPrefix;
    private final ExecutorService kafkaReaderThread;
    private final ExecutionContextExecutorService execContext;
    private String groupId;
    private int nextId;
    private volatile Consumer<byte[], byte[]> _consumer;
    private final int maxOffsetFetchAttempts;
    private final long offsetFetchAttemptIntervalMs;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    public ExecutorService kafkaReaderThread() {
        return this.kafkaReaderThread;
    }

    public ExecutionContextExecutorService execContext() {
        return this.execContext;
    }

    private String groupId() {
        return this.groupId;
    }

    private void groupId_$eq(String x$1) {
        this.groupId = x$1;
    }

    private int nextId() {
        return this.nextId;
    }

    private void nextId_$eq(int x$1) {
        this.nextId = x$1;
    }

    public Consumer<byte[], byte[]> _consumer() {
        return this._consumer;
    }

    public void _consumer_$eq(Consumer<byte[], byte[]> x$1) {
        this._consumer = x$1;
    }

    public synchronized Consumer<byte[], byte[]> consumer() {
        block0: {
            Predef$.MODULE$.assert(Thread.currentThread() instanceof UninterruptibleThread);
            if (this._consumer() != null) break block0;
            HashMap<String, Object> newKafkaParams = new HashMap<String, Object>(this.driverKafkaParams);
            newKafkaParams.put("group.id", this.nextGroupId());
            this._consumer_$eq(this.consumerStrategy.createConsumer(newKafkaParams));
        }
        return this._consumer();
    }

    private int maxOffsetFetchAttempts() {
        return this.maxOffsetFetchAttempts;
    }

    private long offsetFetchAttemptIntervalMs() {
        return this.offsetFetchAttemptIntervalMs;
    }

    private String nextGroupId() {
        this.groupId_$eq(new StringBuilder(1).append(this.driverGroupIdPrefix).append("-").append(this.nextId()).toString());
        this.nextId_$eq(this.nextId() + 1);
        return this.groupId();
    }

    public String toString() {
        return this.consumerStrategy.toString();
    }

    public void close() {
        Object object = this._consumer() != null ? this.runUninterruptibly((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.stopConsumer()) : BoxedUnit.UNIT;
        this.kafkaReaderThread().shutdown();
    }

    public scala.collection.immutable.Set<TopicPartition> fetchTopicPartitions() {
        return (scala.collection.immutable.Set)this.runUninterruptibly((Function0 & Serializable & scala.Serializable)() -> {
            Predef$.MODULE$.assert(Thread.currentThread() instanceof UninterruptibleThread);
            this.consumer().poll(0L);
            Set partitions = this.consumer().assignment();
            this.consumer().pause((Collection)partitions);
            return ((TraversableOnce)JavaConverters$.MODULE$.asScalaSetConverter(partitions).asScala()).toSet();
        });
    }

    public KafkaSourceOffset fetchSpecificOffsets(scala.collection.immutable.Map<TopicPartition, Object> partitionOffsets, Function1<String, BoxedUnit> reportDataLoss) {
        scala.collection.immutable.Map fetched = (scala.collection.immutable.Map)this.runUninterruptibly((Function0 & Serializable & scala.Serializable)() -> this.withRetriesWithoutInterrupt((Function0<scala.collection.immutable.Map<TopicPartition, Object>>)(Function0 & Serializable & scala.Serializable)() -> {
            this.consumer().poll(0L);
            Set partitions = this.consumer().assignment();
            ((IterableLike)((SetLike)JavaConverters$.MODULE$.asScalaSetConverter(partitions).asScala()).map((Function1 & Serializable & scala.Serializable)p -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(p), (Object)BoxesRunTime.boxToLong((long)this.consumer().position(p))), Set$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x$1 -> {
                KafkaOffsetReader.$anonfun$fetchSpecificOffsets$4(x$1);
                return BoxedUnit.UNIT;
            });
            this.consumer().pause((Collection)partitions);
            Object object = JavaConverters$.MODULE$.asScalaSetConverter(partitions).asScala();
            scala.collection.immutable.Set set = partitionOffsets.keySet();
            Predef$.MODULE$.assert(!(object != null ? !object.equals(set) : set != null), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(139).append("If startingOffsets contains specific offsets, you must specify all TopicPartitions.\nUse -1 for latest, -2 for earliest, if you don't care.\n").append(new StringBuilder(22).append("Specified: ").append(partitionOffsets.keySet()).append(" Assigned: ").append(JavaConverters$.MODULE$.asScalaSetConverter(partitions).asScala()).toString()).toString());
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append("Partitions assigned to consumer: ").append(partitions).append(". Seeking to ").append(partitionOffsets).toString());
            partitionOffsets.foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                KafkaOffsetReader.$anonfun$fetchSpecificOffsets$7(this, x0$1);
                return BoxedUnit.UNIT;
            });
            return (scala.collection.immutable.Map)partitionOffsets.map((Function1 & Serializable & scala.Serializable)x0$2 -> {
                Tuple2 tuple2 = x0$2;
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                TopicPartition tp = (TopicPartition)tuple2._1();
                Tuple2 tuple22 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), (Object)BoxesRunTime.boxToLong((long)this.consumer().position(tp)));
                return tuple22;
            }, Map$.MODULE$.canBuildFrom());
        }));
        partitionOffsets.foreach((Function1 & Serializable & scala.Serializable)x0$3 -> {
            KafkaOffsetReader.$anonfun$fetchSpecificOffsets$9(reportDataLoss, fetched, x0$3);
            return BoxedUnit.UNIT;
        });
        return new KafkaSourceOffset((scala.collection.immutable.Map<TopicPartition, Object>)fetched);
    }

    public scala.collection.immutable.Map<TopicPartition, Object> fetchEarliestOffsets() {
        return (scala.collection.immutable.Map)this.runUninterruptibly((Function0 & Serializable & scala.Serializable)() -> this.withRetriesWithoutInterrupt((Function0<scala.collection.immutable.Map<TopicPartition, Object>>)(Function0 & Serializable & scala.Serializable)() -> {
            void var2_2;
            this.consumer().poll(0L);
            Set partitions = this.consumer().assignment();
            this.consumer().pause((Collection)partitions);
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(59).append("Partitions assigned to consumer: ").append(partitions).append(". Seeking to the beginning").toString());
            this.consumer().seekToBeginning((Collection)partitions);
            scala.collection.immutable.Map partitionOffsets = ((TraversableOnce)((SetLike)JavaConverters$.MODULE$.asScalaSetConverter(partitions).asScala()).map((Function1 & Serializable & scala.Serializable)p -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(p), (Object)BoxesRunTime.boxToLong((long)this.consumer().position(p))), Set$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(37).append("Got earliest offsets for partition : ").append(partitionOffsets).toString());
            return var2_2;
        }));
    }

    public scala.collection.immutable.Map<TopicPartition, Object> fetchLatestOffsets(Option<scala.collection.immutable.Map<TopicPartition, Object>> knownOffsets) {
        return (scala.collection.immutable.Map)this.runUninterruptibly((Function0 & Serializable & scala.Serializable)() -> this.withRetriesWithoutInterrupt((Function0<scala.collection.immutable.Map<TopicPartition, Object>>)(Function0 & Serializable & scala.Serializable)() -> {
            scala.collection.immutable.Map map;
            this.consumer().poll(0L);
            Set partitions = this.consumer().assignment();
            ((IterableLike)((SetLike)JavaConverters$.MODULE$.asScalaSetConverter(partitions).asScala()).map((Function1 & Serializable & scala.Serializable)p -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(p), (Object)BoxesRunTime.boxToLong((long)this.consumer().position(p))), Set$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x$2 -> {
                KafkaOffsetReader.$anonfun$fetchLatestOffsets$4(x$2);
                return BoxedUnit.UNIT;
            });
            this.consumer().pause((Collection)partitions);
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(54).append("Partitions assigned to consumer: ").append(partitions).append(". Seeking to the end.").toString());
            if (knownOffsets.isEmpty()) {
                this.consumer().seekToEnd((Collection)partitions);
                map = ((TraversableOnce)((SetLike)JavaConverters$.MODULE$.asScalaSetConverter(partitions).asScala()).map((Function1 & Serializable & scala.Serializable)p -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(p), (Object)BoxesRunTime.boxToLong((long)this.consumer().position(p))), Set$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
            } else {
                ObjectRef partitionOffsets = ObjectRef.create((Object)Predef$.MODULE$.Map().empty());
                ObjectRef incorrectOffsets = ObjectRef.create((Object)Nil$.MODULE$);
                int attempt = 0;
                do {
                    this.consumer().seekToEnd((Collection)partitions);
                    partitionOffsets.elem = ((TraversableOnce)((SetLike)JavaConverters$.MODULE$.asScalaSetConverter(partitions).asScala()).map((Function1 & Serializable & scala.Serializable)p -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(p), (Object)BoxesRunTime.boxToLong((long)this.consumer().position(p))), Set$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
                    ++attempt;
                    incorrectOffsets.elem = KafkaOffsetReader.findIncorrectOffsets$1(knownOffsets, partitionOffsets);
                    if (!((Seq)incorrectOffsets.elem).nonEmpty()) continue;
                    this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(43).append("Found incorrect offsets in some partitions ").append(new StringBuilder(46).append("(partition, previous offset, fetched offset): ").append((Seq)incorrectOffsets$1.elem).toString()).toString());
                    if (attempt >= this.maxOffsetFetchAttempts()) continue;
                    this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Retrying to fetch latest offsets because of incorrect offsets");
                    Thread.sleep(this.offsetFetchAttemptIntervalMs());
                } while (((Seq)incorrectOffsets.elem).nonEmpty() && attempt < this.maxOffsetFetchAttempts());
                this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(35).append("Got latest offsets for partition : ").append((scala.collection.immutable.Map)partitionOffsets$3.elem).toString());
                map = (scala.collection.immutable.Map)partitionOffsets.elem;
            }
            return map;
        }));
    }

    public scala.collection.immutable.Map<TopicPartition, Object> fetchEarliestOffsets(Seq<TopicPartition> newPartitions) {
        return newPartitions.isEmpty() ? Predef$.MODULE$.Map().empty() : (scala.collection.immutable.Map)this.runUninterruptibly((Function0 & Serializable & scala.Serializable)() -> this.withRetriesWithoutInterrupt((Function0<scala.collection.immutable.Map<TopicPartition, Object>>)(Function0 & Serializable & scala.Serializable)() -> {
            void var3_3;
            this.consumer().poll(0L);
            Set partitions = this.consumer().assignment();
            this.consumer().pause((Collection)partitions);
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(34).append("\tPartitions assigned to consumer: ").append(partitions).toString());
            this.consumer().seekToBeginning((Collection)partitions);
            scala.collection.immutable.Map partitionOffsets = ((TraversableOnce)((TraversableLike)newPartitions.filter((Function1 & Serializable & scala.Serializable)p -> BoxesRunTime.boxToBoolean((boolean)partitions.contains(p)))).map((Function1 & Serializable & scala.Serializable)p -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(p), (Object)BoxesRunTime.boxToLong((long)this.consumer().position(p))), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(41).append("Got earliest offsets for new partitions: ").append(partitionOffsets).toString());
            return var3_3;
        }));
    }

    private <T> T runUninterruptibly(Function0<T> body) {
        Object object;
        if (!(Thread.currentThread() instanceof UninterruptibleThread)) {
            Future future = Future$.MODULE$.apply(body, (ExecutionContext)this.execContext());
            object = ThreadUtils$.MODULE$.awaitResult((Awaitable)future, (Duration)Duration$.MODULE$.Inf());
        } else {
            object = body.apply();
        }
        return (T)object;
    }

    private scala.collection.immutable.Map<TopicPartition, Object> withRetriesWithoutInterrupt(Function0<scala.collection.immutable.Map<TopicPartition, Object>> body) {
        scala.collection.immutable.Map map;
        Predef$.MODULE$.assert(Thread.currentThread() instanceof UninterruptibleThread);
        KafkaOffsetReader kafkaOffsetReader = this;
        synchronized (kafkaOffsetReader) {
            ObjectRef result = ObjectRef.create((Object)None$.MODULE$);
            IntRef attempt = IntRef.create((int)1);
            ObjectRef lastException = ObjectRef.create(null);
            while (((Option)result.elem).isEmpty() && attempt.elem <= this.maxOffsetFetchAttempts() && !Thread.currentThread().isInterrupted()) {
                Thread thread = Thread.currentThread();
                if (thread instanceof UninterruptibleThread) {
                    UninterruptibleThread uninterruptibleThread = (UninterruptibleThread)thread;
                    BoxedUnit boxedUnit = (BoxedUnit)uninterruptibleThread.runUninterruptibly((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                        try {
                            result$1.elem = new Some(body.apply());
                        }
                        catch (Throwable throwable) {
                            Throwable throwable2 = throwable;
                            Option option = NonFatal$.MODULE$.unapply(throwable2);
                            if (!option.isEmpty()) {
                                Throwable e = (Throwable)option.get();
                                lastException$1.elem = e;
                                this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(41).append("Error in attempt ").append(attempt$1.elem).append(" getting Kafka offsets: ").toString(), e);
                                ++attempt$1.elem;
                                Thread.sleep(this.offsetFetchAttemptIntervalMs());
                                this.resetConsumer();
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                            }
                            throw throwable;
                        }
                    });
                    continue;
                }
                throw new IllegalStateException("Kafka APIs must be executed on a o.a.spark.util.UninterruptibleThread");
            }
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            if (((Option)result.elem).isEmpty()) {
                Predef$.MODULE$.assert(attempt.elem > this.maxOffsetFetchAttempts());
                Predef$.MODULE$.assert((Throwable)lastException.elem != null);
                throw (Throwable)lastException.elem;
            }
            map = (scala.collection.immutable.Map)((Option)result.elem).get();
        }
        return map;
    }

    private synchronized void stopConsumer() {
        block0: {
            Predef$.MODULE$.assert(Thread.currentThread() instanceof UninterruptibleThread);
            if (this._consumer() == null) break block0;
            this._consumer().close();
        }
    }

    private synchronized void resetConsumer() {
        this.stopConsumer();
        this._consumer_$eq(null);
    }

    public static final /* synthetic */ void $anonfun$fetchSpecificOffsets$4(Tuple2 x$1) {
    }

    /*
     * Enabled aggressive block sorting
     */
    public static final /* synthetic */ void $anonfun$fetchSpecificOffsets$7(KafkaOffsetReader $this, Tuple2 x0$1) {
        Tuple2 tuple2 = x0$1;
        if (tuple2 != null) {
            TopicPartition tp = (TopicPartition)tuple2._1();
            long l = tuple2._2$mcJ$sp();
            if (KafkaOffsetRangeLimit$.MODULE$.LATEST() == l) {
                $this.consumer().seekToEnd(Arrays.asList((Object[])new TopicPartition[]{tp}));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 != null) {
            TopicPartition tp = (TopicPartition)tuple2._1();
            long l = tuple2._2$mcJ$sp();
            if (KafkaOffsetRangeLimit$.MODULE$.EARLIEST() == l) {
                $this.consumer().seekToBeginning(Arrays.asList((Object[])new TopicPartition[]{tp}));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 == null) throw new MatchError((Object)tuple2);
        TopicPartition tp = (TopicPartition)tuple2._1();
        long off = tuple2._2$mcJ$sp();
        $this.consumer().seek(tp, off);
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    /*
     * Enabled aggressive block sorting
     */
    public static final /* synthetic */ void $anonfun$fetchSpecificOffsets$9(Function1 reportDataLoss$1, scala.collection.immutable.Map fetched$1, Tuple2 x0$3) {
        Tuple2 tuple2 = x0$3;
        if (tuple2 != null) {
            TopicPartition tp = (TopicPartition)tuple2._1();
            long off = tuple2._2$mcJ$sp();
            if (off != KafkaOffsetRangeLimit$.MODULE$.LATEST() && off != KafkaOffsetRangeLimit$.MODULE$.EARLIEST()) {
                BoxedUnit boxedUnit = BoxesRunTime.unboxToLong((Object)fetched$1.apply((Object)tp)) != off ? (BoxedUnit)reportDataLoss$1.apply((Object)new StringBuilder(48).append("startingOffsets for ").append(tp).append(" was ").append(off).append(" but consumer reset to ").append(fetched$1.apply((Object)tp)).toString()) : BoxedUnit.UNIT;
                return;
            }
        }
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$fetchLatestOffsets$4(Tuple2 x$2) {
    }

    public static final /* synthetic */ Object $anonfun$fetchLatestOffsets$9(ObjectRef incorrectOffsets$2, TopicPartition tp$1, long offset$1, long knownOffset) {
        BoxedUnit boxedUnit;
        if (knownOffset > offset$1) {
            Tuple3 incorrectOffset = new Tuple3((Object)tp$1, (Object)BoxesRunTime.boxToLong((long)knownOffset), (Object)BoxesRunTime.boxToLong((long)offset$1));
            boxedUnit = ((ArrayBuffer)incorrectOffsets$2.elem).$plus$eq((Object)incorrectOffset);
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        return boxedUnit;
    }

    public static final /* synthetic */ void $anonfun$fetchLatestOffsets$8(ObjectRef incorrectOffsets$2, TopicPartition tp$1, long offset$1, scala.collection.immutable.Map x$3) {
        x$3.get((Object)tp$1).foreach((Function1 & Serializable & scala.Serializable)knownOffset -> KafkaOffsetReader.$anonfun$fetchLatestOffsets$9(incorrectOffsets$2, tp$1, offset$1, BoxesRunTime.unboxToLong((Object)knownOffset)));
    }

    public static final /* synthetic */ void $anonfun$fetchLatestOffsets$7(Option knownOffsets$1, ObjectRef incorrectOffsets$2, Tuple2 x0$4) {
        Tuple2 tuple2 = x0$4;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        TopicPartition tp = (TopicPartition)tuple2._1();
        long offset = tuple2._2$mcJ$sp();
        knownOffsets$1.foreach((Function1 & Serializable & scala.Serializable)x$3 -> {
            KafkaOffsetReader.$anonfun$fetchLatestOffsets$8(incorrectOffsets$2, tp, offset, x$3);
            return BoxedUnit.UNIT;
        });
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    private static final Seq findIncorrectOffsets$1(Option knownOffsets$1, ObjectRef partitionOffsets$3) {
        ObjectRef incorrectOffsets = ObjectRef.create((Object)((ArrayBuffer)ArrayBuffer$.MODULE$.apply((Seq)Nil$.MODULE$)));
        ((scala.collection.immutable.Map)partitionOffsets$3.elem).foreach((Function1 & Serializable & scala.Serializable)x0$4 -> {
            KafkaOffsetReader.$anonfun$fetchLatestOffsets$7(knownOffsets$1, incorrectOffsets, x0$4);
            return BoxedUnit.UNIT;
        });
        return (ArrayBuffer)incorrectOffsets.elem;
    }

    public KafkaOffsetReader(ConsumerStrategy consumerStrategy, Map<String, Object> driverKafkaParams, scala.collection.immutable.Map<String, String> readerOptions, String driverGroupIdPrefix) {
        this.consumerStrategy = consumerStrategy;
        this.driverKafkaParams = driverKafkaParams;
        this.driverGroupIdPrefix = driverGroupIdPrefix;
        Logging.$init$((Logging)this);
        this.kafkaReaderThread = Executors.newSingleThreadExecutor(new ThreadFactory(null){

            /*
             * WARNING - void declaration
             */
            public Thread newThread(Runnable r) {
                void var2_2;
                UninterruptibleThread t = new UninterruptibleThread(null, r){
                    private final Runnable r$1;

                    public void run() {
                        this.r$1.run();
                    }
                    {
                        this.r$1 = r$1;
                        super("Kafka Offset Reader");
                    }
                };
                t.setDaemon(true);
                return var2_2;
            }
        });
        this.execContext = ExecutionContext$.MODULE$.fromExecutorService(this.kafkaReaderThread());
        this.groupId = null;
        this.nextId = 0;
        this._consumer = null;
        this.maxOffsetFetchAttempts = new StringOps(Predef$.MODULE$.augmentString((String)readerOptions.getOrElse((Object)"fetchOffset.numRetries", (Function0 & Serializable & scala.Serializable)() -> "3"))).toInt();
        this.offsetFetchAttemptIntervalMs = new StringOps(Predef$.MODULE$.augmentString((String)readerOptions.getOrElse((Object)"fetchOffset.retryIntervalMs", (Function0 & Serializable & scala.Serializable)() -> "1000"))).toLong();
    }
}

