/*
 * Decompiled with CFR 0.152.
 */
package ai.starlake.utils.kafka;

import ai.starlake.config.Settings;
import ai.starlake.schema.model.Mode;
import ai.starlake.schema.model.Mode$;
import ai.starlake.schema.model.Mode$FILE$;
import ai.starlake.schema.model.Mode$STREAM$;
import ai.starlake.utils.FileLock;
import ai.starlake.utils.YamlSerializer$;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.DatasetLogging;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Map$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;

@ScalaSignature(bytes="\u0006\u0001\t\u001db\u0001\u0002\u000f\u001e\u0001\u0019B\u0001b\u0013\u0001\u0003\u0002\u0003\u0006I\u0001\u0014\u0005\t=\u0002\u0011\t\u0011)A\u0006?\")1\r\u0001C\u0001I\"9!\u000e\u0001b\u0001\n\u0003Y\u0007B\u0002;\u0001A\u0003%A\u000eC\u0004v\u0001\t\u0007I\u0011\u0001<\t\u000f\u0005\u0015\u0001\u0001)A\u0005o\"I\u0011q\u0001\u0001C\u0002\u0013\u0005\u0011\u0011\u0002\u0005\t\u0003#\u0001\u0001\u0015!\u0003\u0002\f!I\u00111\u0003\u0001C\u0002\u0013\u0005\u0011Q\u0003\u0005\t\u0003G\u0001\u0001\u0015!\u0003\u0002\u0018!Q\u0011Q\u0005\u0001\t\u0006\u0004%\t!a\n\t\u000f\u0005m\u0002\u0001\"\u0001\u0002>!9\u0011Q\t\u0001\u0005\u0002\u0005\u001d\u0003bBA'\u0001\u0011\u0005\u0011q\n\u0005\b\u0003?\u0002A\u0011AA1\u0011\u001d\t\u0019\t\u0001C\u0001\u0003\u000bCq!!)\u0001\t\u0013\t\u0019\u000bC\u0004\u0002(\u0002!\t!!+\t\u000f\u0005U\u0006\u0001\"\u0003\u00028\"9\u0011\u0011\u0019\u0001\u0005\n\u0005\r\u0007bBAh\u0001\u0011%\u0011\u0011\u001b\u0005\b\u0003+\u0004A\u0011AAl\u0011\u001d\tY\u000e\u0001C\u0001\u0003;Dq!!:\u0001\t\u0003\t9\u000fC\u0004\u0003\u0016\u0001!\tAa\u0006\t\u000f\tu\u0001\u0001\"\u0001\u0003 \tY1*\u00194lC\u000ec\u0017.\u001a8u\u0015\tqr$A\u0003lC\u001a\\\u0017M\u0003\u0002!C\u0005)Q\u000f^5mg*\u0011!eI\u0001\tgR\f'\u000f\\1lK*\tA%\u0001\u0002bS\u000e\u00011#\u0002\u0001([]\u001a\u0005C\u0001\u0015,\u001b\u0005I#\"\u0001\u0016\u0002\u000bM\u001c\u0017\r\\1\n\u00051J#AB!osJ+g\r\u0005\u0002/k5\tqF\u0003\u00021c\u0005a1oY1mC2|wmZ5oO*\u0011!gM\u0001\tif\u0004Xm]1gK*\tA'A\u0002d_6L!AN\u0018\u0003\u001bM#(/[2u\u0019><w-\u001b8h!\tA\u0014)D\u0001:\u0015\tQ4(A\u0002tc2T!\u0001P\u001f\u0002\u000bM\u0004\u0018M]6\u000b\u0005yz\u0014AB1qC\u000eDWMC\u0001A\u0003\ry'oZ\u0005\u0003\u0005f\u0012a\u0002R1uCN,G\u000fT8hO&tw\r\u0005\u0002E\u00136\tQI\u0003\u0002G\u000f\u0006!A.\u00198h\u0015\u0005A\u0015\u0001\u00026bm\u0006L!AS#\u0003\u001b\u0005+Ho\\\"m_N,\u0017M\u00197f\u0003-Y\u0017MZ6b\u0007>tg-[4\u0011\u00055[fB\u0001(Y\u001d\tyeK\u0004\u0002Q+:\u0011\u0011\u000bV\u0007\u0002%*\u00111+J\u0001\u0007yI|w\u000e\u001e \n\u0003\u0011J!AI\u0012\n\u0005]\u000b\u0013AB2p]\u001aLw-\u0003\u0002Z5\u0006A1+\u001a;uS:<7O\u0003\u0002XC%\u0011A,\u0018\u0002\f\u0017\u000647.Y\"p]\u001aLwM\u0003\u0002Z5\u0006A1/\u001a;uS:<7\u000f\u0005\u0002aC6\t!,\u0003\u0002c5\nA1+\u001a;uS:<7/\u0001\u0004=S:LGO\u0010\u000b\u0003K&$\"A\u001a5\u0011\u0005\u001d\u0004Q\"A\u000f\t\u000by\u001b\u00019A0\t\u000b-\u001b\u0001\u0019\u0001'\u0002!\r|W.\u001a;PM\u001a\u001cX\r^:N_\u0012,W#\u00017\u0011\u00055\u0014X\"\u00018\u000b\u0005=\u0004\u0018!B7pI\u0016d'BA9\"\u0003\u0019\u00198\r[3nC&\u00111O\u001c\u0002\u0005\u001b>$W-A\td_6,Go\u00144gg\u0016$8/T8eK\u0002\nQb]3sm\u0016\u0014x\n\u001d;j_:\u001cX#A<\u0011\tadxp \b\u0003sj\u0004\"!U\u0015\n\u0005mL\u0013A\u0002)sK\u0012,g-\u0003\u0002~}\n\u0019Q*\u00199\u000b\u0005mL\u0003c\u0001=\u0002\u0002%\u0019\u00111\u0001@\u0003\rM#(/\u001b8h\u00039\u0019XM\u001d<fe>\u0003H/[8og\u0002\n!cY8nKR|eMZ:fiN\u001cuN\u001c4jOV\u0011\u00111\u0002\t\u0004\u001b\u00065\u0011bAA\b;\n\u00012*\u00194lCR{\u0007/[2D_:4\u0017nZ\u0001\u0014G>lW\r^(gMN,Go]\"p]\u001aLw\rI\u0001\u0006aJ|\u0007o]\u000b\u0003\u0003/\u0001B!!\u0007\u0002 5\u0011\u00111\u0004\u0006\u0004\u0003;9\u0015\u0001B;uS2LA!!\t\u0002\u001c\tQ\u0001K]8qKJ$\u0018.Z:\u0002\rA\u0014x\u000e]:!\u0003\u0019\u0019G.[3oiV\u0011\u0011\u0011\u0006\t\u0005\u0003W\t9$\u0004\u0002\u0002.)!\u0011qFA\u0019\u0003\u0015\tG-\\5o\u0015\u0011\t\u0019$!\u000e\u0002\u000f\rd\u0017.\u001a8ug*\u0011a$P\u0005\u0005\u0003s\tiCA\u0006BI6Lgn\u00117jK:$\u0018!B2m_N,GCAA !\rA\u0013\u0011I\u0005\u0004\u0003\u0007J#\u0001B+oSR\f1\u0002Z3mKR,Gk\u001c9jGR!\u0011qHA%\u0011\u0019\tYE\u0004a\u0001\u007f\u0006IAo\u001c9jG:\u000bW.Z\u0001\u0018GJ,\u0017\r^3U_BL7-\u00134O_R\u0004&/Z:f]R$b!a\u0010\u0002R\u0005m\u0003bBA*\u001f\u0001\u0007\u0011QK\u0001\u0006i>\u0004\u0018n\u0019\t\u0005\u0003W\t9&\u0003\u0003\u0002Z\u00055\"\u0001\u0003(foR{\u0007/[2\t\r\u0005us\u00021\u0001x\u0003\u0011\u0019wN\u001c4\u0002\u001fQ|\u0007/[2QCJ$\u0018\u000e^5p]N$B!a\u0019\u0002\u0002B1\u0011QMA8\u0003krA!a\u001a\u0002l9\u0019\u0011+!\u001b\n\u0003)J1!!\u001c*\u0003\u001d\u0001\u0018mY6bO\u0016LA!!\u001d\u0002t\t!A*[:u\u0015\r\ti'\u000b\t\u0005\u0003o\ni(\u0004\u0002\u0002z)!\u00111PA\u001b\u0003\u0019\u0019w.\\7p]&!\u0011qPA=\u0005I!v\u000e]5d!\u0006\u0014H/\u001b;j_:LeNZ8\t\r\u0005-\u0003\u00031\u0001\u0000\u0003=!x\u000e]5d\u000b:$wJ\u001a4tKR\u001cHCBAD\u00037\u000bi\n\u0005\u0004\u0002f\u0005=\u0014\u0011\u0012\t\bQ\u0005-\u0015qRAK\u0013\r\ti)\u000b\u0002\u0007)V\u0004H.\u001a\u001a\u0011\u0007!\n\t*C\u0002\u0002\u0014&\u00121!\u00138u!\rA\u0013qS\u0005\u0004\u00033K#\u0001\u0002'p]\u001eDa!a\u0013\u0012\u0001\u0004y\bBBAP#\u0001\u0007q/A\u0007bG\u000e,7o](qi&|gn]\u0001\u000bEVLG\u000e\u001a)s_B\u001cH\u0003BA\f\u0003KCa!a(\u0013\u0001\u00049\u0018\u0001\u0005;pa&\u001c7+\u0019<f\u001f\u001a47/\u001a;t)!\ty$a+\u00020\u0006E\u0006BBAW'\u0001\u0007q0A\bu_BL7mQ8oM&<g*Y7f\u0011\u0019\tyj\u0005a\u0001o\"9\u00111W\nA\u0002\u0005\u001d\u0015aB8gMN,Go]\u0001\u001ei>\u0004\u0018nY\"veJ,g\u000e^(gMN,Go\u001d$s_6\u001cFO]3b[R!\u0011\u0011XA`!\u0015A\u00131XAD\u0013\r\ti,\u000b\u0002\u0007\u001fB$\u0018n\u001c8\t\r\u00055F\u00031\u0001\u0000\u0003A\u0019w.\\3u\u001f\u001a47/\u001a;t\u0019>\u001c7\u000e\u0006\u0003\u0002F\u00065\u0007\u0003BAd\u0003\u0013l\u0011aH\u0005\u0004\u0003\u0017|\"\u0001\u0003$jY\u0016dunY6\t\r\u00055V\u00031\u0001\u0000\u0003m!x\u000e]5d\u0007V\u0014(/\u001a8u\u001f\u001a47/\u001a;t\rJ|WNR5mKR!\u0011\u0011XAj\u0011\u0019\tiK\u0006a\u0001\u007f\u0006\u0019Bo\u001c9jG\u000e+(O]3oi>3gm]3ugR!\u0011\u0011XAm\u0011\u0019\tik\u0006a\u0001\u007f\u0006iqN\u001a4tKR\u001c\u0018i\u001d&t_:$b!a8\u0002b\u0006\r\b\u0003\u0002\u0015\u0002<~Da!a\u0013\u0019\u0001\u0004y\bbBAZ1\u0001\u0007\u0011qQ\u0001\u0012G>t7/^7f)>\u0004\u0018n\u0019\"bi\u000eDG\u0003CAu\u0005\u000f\u0011IAa\u0005\u0011\u000f!\nY)a;\u0002\bB!\u0011Q\u001eB\u0001\u001d\u0011\ty/a@\u000f\t\u0005E\u0018Q \b\u0005\u0003g\fYP\u0004\u0003\u0002v\u0006ehbA)\u0002x&\t\u0001)\u0003\u0002?\u007f%\u0011A(P\u0005\u0003umJ1!!\u001c:\u0013\u0011\u0011\u0019A!\u0002\u0003\u0013\u0011\u000bG/\u0019$sC6,'bAA7s!1\u0011QV\rA\u0002}DqAa\u0003\u001a\u0001\u0004\u0011i!A\u0004tKN\u001c\u0018n\u001c8\u0011\u0007a\u0012y!C\u0002\u0003\u0012e\u0012Ab\u00159be.\u001cVm]:j_:DaaV\rA\u0002\u0005-\u0011!F2p]N,X.\u001a+pa&\u001c7\u000b\u001e:fC6Lgn\u001a\u000b\u0007\u0003W\u0014IBa\u0007\t\u000f\t-!\u00041\u0001\u0003\u000e!1qK\u0007a\u0001\u0003\u0017\t1b]5oWR{Gk\u001c9jGR1\u0011q\bB\u0011\u0005GAaaV\u000eA\u0002\u0005-\u0001b\u0002B\u00137\u0001\u0007\u00111^\u0001\u0003I\u001a\u0004")
public class KafkaClient
implements StrictLogging,
DatasetLogging,
AutoCloseable {
    private AdminClient client;
    private final Settings settings;
    private final Mode cometOffsetsMode;
    private final scala.collection.immutable.Map<String, String> serverOptions;
    private final Settings.KafkaTopicConfig cometOffsetsConfig;
    private final Properties props;
    private final Logger logger;
    private volatile boolean bitmap$0;

    @Override
    public <T> DatasetLogging.DatasetHelper<T> DatasetHelper(Dataset<T> ds) {
        return DatasetLogging.DatasetHelper$(this, ds);
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    public Mode cometOffsetsMode() {
        return this.cometOffsetsMode;
    }

    public scala.collection.immutable.Map<String, String> serverOptions() {
        return this.serverOptions;
    }

    public Settings.KafkaTopicConfig cometOffsetsConfig() {
        return this.cometOffsetsConfig;
    }

    public Properties props() {
        return this.props;
    }

    private AdminClient client$lzycompute() {
        KafkaClient kafkaClient = this;
        synchronized (kafkaClient) {
            if (!this.bitmap$0) {
                this.client = AdminClient.create((Properties)this.props());
                this.bitmap$0 = true;
            }
        }
        return this.client;
    }

    public AdminClient client() {
        return !this.bitmap$0 ? this.client$lzycompute() : this.client;
    }

    @Override
    public void close() {
        this.client().close();
    }

    public void deleteTopic(String topicName) {
        block4: {
            BoxedUnit boxedUnit;
            BoxedUnit boxedUnit2;
            boolean found = ((Set)this.client().listTopics().names().get()).contains(topicName);
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info(((TraversableOnce)JavaConverters$.MODULE$.asScalaSetConverter((Set)this.client().listTopics().names().get()).asScala()).toSet().mkString("\n"));
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                boxedUnit2 = BoxedUnit.UNIT;
            }
            if (!found) break block4;
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Deleting topic {}", new Object[]{topicName});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            this.client().deleteTopics(JavaConverters$.MODULE$.asJavaCollectionConverter((Iterable)new .colon.colon((Object)topicName, (List)Nil$.MODULE$)).asJavaCollection());
        }
    }

    public void createTopicIfNotPresent(NewTopic topic, scala.collection.immutable.Map<String, String> conf) {
        block0: {
            boolean found = ((Set)this.client().listTopics().names().get()).contains(topic.name());
            if (found) break block0;
            this.client().createTopics(Collections.singleton(topic.configs((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(conf).asJava()))).all().get();
        }
    }

    public List<TopicPartitionInfo> topicPartitions(String topicName) {
        return ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(((TopicDescription)((java.util.Map)this.client().describeTopics(Collections.singleton(topicName)).all().get()).get(topicName)).partitions()).asScala()).toList();
    }

    public List<Tuple2<Object, Object>> topicEndOffsets(String topicName, scala.collection.immutable.Map<String, String> accessOptions) {
        List value;
        Try try_ = Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> {
            BoxedUnit boxedUnit;
            Properties props = this.buildProps(accessOptions);
            KafkaConsumer consumer = new KafkaConsumer(props);
            if (this.logger().underlying().isInfoEnabled()) {
                BoxedUnit boxedUnit2;
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("access options for topic {} ==>", new Object[]{topicName});
                    boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    boxedUnit2 = BoxedUnit.UNIT;
                }
                ((IterableLike)JavaConverters$.MODULE$.propertiesAsScalaMapConverter(props).asScala()).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                    KafkaClient.$anonfun$topicEndOffsets$2(this, x0$1);
                    return BoxedUnit.UNIT;
                });
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            List partitions = ((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(consumer.partitionsFor(topicName)).asScala()).map((Function1 & Serializable & scala.Serializable)info -> new TopicPartition(topicName, info.partition()), Buffer$.MODULE$.canBuildFrom())).toList();
            consumer.assign((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)partitions).asJava());
            consumer.seekToEnd((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)partitions).asJava());
            return (List)partitions.map((Function1 & Serializable & scala.Serializable)p -> new Tuple2.mcIJ.sp(p.partition(), consumer.position(p)), List$.MODULE$.canBuildFrom());
        });
        if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable e = failure.exception();
            e.printStackTrace();
            throw e;
        }
        if (!(try_ instanceof Success)) {
            throw new MatchError((Object)try_);
        }
        Success success = (Success)try_;
        List list = value = (List)success.value();
        return list;
    }

    /*
     * WARNING - void declaration
     */
    private Properties buildProps(scala.collection.immutable.Map<String, String> accessOptions) {
        void var2_2;
        Properties props = new Properties();
        accessOptions.foreach((Function1 & Serializable & scala.Serializable)option -> props.put(option._1(), option._2()));
        return var2_2;
    }

    public void topicSaveOffsets(String topicConfigName, scala.collection.immutable.Map<String, String> accessOptions, List<Tuple2<Object, Object>> offsets) {
        Mode mode = this.cometOffsetsMode();
        if (((Object)Mode$STREAM$.MODULE$).equals(mode)) {
            Properties props = this.buildProps(accessOptions);
            KafkaProducer producer = new KafkaProducer(props);
            offsets.foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                Tuple2 tuple2 = x0$1;
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                int partition = tuple2._1$mcI$sp();
                long offset = tuple2._2$mcJ$sp();
                Future future = producer.send(new ProducerRecord(this.cometOffsetsConfig().topicName(), (Object)new StringBuilder(1).append(topicConfigName).append("/").append(partition).toString(), (Object)String.valueOf(BoxesRunTime.boxToLong((long)offset))));
                return future;
            });
            producer.close();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (((Object)Mode$FILE$.MODULE$).equals(mode)) {
            FileLock qual$1 = this.cometOffsetsLock(topicConfigName);
            long x$1 = qual$1.doExclusively$default$1();
            JFunction0.mcV.sp & Serializable & scala.Serializable x$2 = (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                BoxedUnit boxedUnit;
                Path cometOffsetsPath = new Path(this.cometOffsetsConfig().topicName(), topicConfigName);
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Saving comet offsets to path {}", new Object[]{cometOffsetsPath});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                $this.settings.storageHandler().write(YamlSerializer$.MODULE$.serializeObject(offsets.map((Function1 & Serializable & scala.Serializable)x0$2 -> {
                    Tuple2 tuple2 = x0$2;
                    if (tuple2 == null) {
                        throw new MatchError((Object)tuple2);
                    }
                    int partition = tuple2._1$mcI$sp();
                    long offset = tuple2._2$mcJ$sp();
                    String string = new StringBuilder(1).append(((Object)BoxesRunTime.boxToInteger((int)partition)).toString()).append(",").append(((Object)BoxesRunTime.boxToLong((long)offset)).toString()).toString();
                    return string;
                }, List$.MODULE$.canBuildFrom())), cometOffsetsPath);
            };
            BoxedUnit boxedUnit = (BoxedUnit)qual$1.doExclusively(x$1, x$2);
        } else {
            throw new Exception("Should never happen");
        }
    }

    private Option<List<Tuple2<Object, Object>>> topicCurrentOffsetsFromStream(String topicConfigName) {
        Properties props = new Properties();
        this.cometOffsetsConfig().accessOptions().foreach((Function1 & Serializable & scala.Serializable)option -> props.put(option._1(), option._2()));
        KafkaConsumer consumer = new KafkaConsumer(props);
        List partitions = (List)this.topicPartitions(this.cometOffsetsConfig().topicName()).map((Function1 & Serializable & scala.Serializable)info -> new TopicPartition(this.cometOffsetsConfig().topicName(), info.partition()), List$.MODULE$.canBuildFrom());
        consumer.assign((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)partitions).asJava());
        consumer.seekToBeginning((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)partitions).asJava());
        scala.collection.mutable.Map offsets = Map$.MODULE$.empty();
        ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
        while (records != null && !records.isEmpty()) {
            ((IterableLike)JavaConverters$.MODULE$.iterableAsScalaIterableConverter(records.records(this.cometOffsetsConfig().topicName())).asScala()).foreach((Function1 & Serializable & scala.Serializable)r -> (scala.collection.mutable.Map)offsets.$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(r.key()), r.value())));
            records = consumer.poll(Duration.ofMillis(100L));
        }
        Option res = ((TraversableLike)offsets.keys().map((Function1 & Serializable & scala.Serializable)k -> {
            String[] tab = new StringOps(Predef$.MODULE$.augmentString(k)).split('/');
            return new Tuple3((Object)tab[0], (Object)tab[1], offsets.apply(k));
        }, Iterable$.MODULE$.canBuildFrom())).groupBy((Function1 & Serializable & scala.Serializable)x0$1 -> {
            String topic;
            Tuple3 tuple3 = x0$1;
            if (tuple3 == null) {
                throw new MatchError((Object)tuple3);
            }
            String string = topic = (String)tuple3._1();
            return string;
        }).mapValues((Function1 & Serializable & scala.Serializable)x$1 -> ((TraversableOnce)x$1.map((Function1 & Serializable & scala.Serializable)x0$2 -> {
            Tuple3 tuple3 = x0$2;
            if (tuple3 == null) {
                throw new MatchError((Object)tuple3);
            }
            String partition = (String)tuple3._2();
            String offset = (String)tuple3._3();
            Tuple2.mcIJ.sp sp2 = new Tuple2.mcIJ.sp(new StringOps(Predef$.MODULE$.augmentString(partition)).toInt(), new StringOps(Predef$.MODULE$.augmentString(offset)).toLong());
            return sp2;
        }, Iterable$.MODULE$.canBuildFrom())).toList()).get((Object)topicConfigName);
        return res;
    }

    private FileLock cometOffsetsLock(String topicConfigName) {
        Path lockPath = new Path(this.settings.comet().lock().path(), new StringBuilder(19).append("comet_offsets_").append(topicConfigName).append(".lock").toString());
        return new FileLock(lockPath, this.settings.storageHandler());
    }

    private Option<List<Tuple2<Object, Object>>> topicCurrentOffsetsFromFile(String topicConfigName) {
        FileLock qual$1 = this.cometOffsetsLock(topicConfigName);
        long x$1 = qual$1.doExclusively$default$1();
        Function0 & Serializable & scala.Serializable x$2 = (Function0 & Serializable & scala.Serializable)() -> {
            None$ none$;
            Path cometOffsetsPath = new Path(this.cometOffsetsConfig().topicName(), topicConfigName);
            if ($this.settings.storageHandler().exists(cometOffsetsPath)) {
                BoxedUnit boxedUnit;
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Loading comet offsets to path {}", new Object[]{cometOffsetsPath});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                List res = (List)((List)YamlSerializer$.MODULE$.mapper().readValue($this.settings.storageHandler().read(cometOffsetsPath), List.class)).map((Function1 & Serializable & scala.Serializable)str -> {
                    String[] tab = new StringOps(Predef$.MODULE$.augmentString(str)).split(',');
                    return new Tuple2.mcIJ.sp(new StringOps(Predef$.MODULE$.augmentString(tab[0])).toInt(), new StringOps(Predef$.MODULE$.augmentString(tab[1])).toLong());
                }, List$.MODULE$.canBuildFrom());
                none$ = new Some((Object)res);
            } else {
                BoxedUnit boxedUnit;
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Cannot load comet offsets: {} file does not exist", new Object[]{cometOffsetsPath});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                none$ = None$.MODULE$;
            }
            return none$;
        };
        return (Option)qual$1.doExclusively(x$1, x$2);
    }

    public Option<List<Tuple2<Object, Object>>> topicCurrentOffsets(String topicConfigName) {
        Option<List<Tuple2<Object, Object>>> option;
        Mode mode = this.cometOffsetsMode();
        if (((Object)Mode$STREAM$.MODULE$).equals(mode)) {
            option = this.topicCurrentOffsetsFromStream(topicConfigName);
        } else if (((Object)Mode$FILE$.MODULE$).equals(mode)) {
            option = this.topicCurrentOffsetsFromFile(topicConfigName);
        } else {
            throw new Exception("Should never happen");
        }
        return option;
    }

    public Option<String> offsetsAsJson(String topicName, List<Tuple2<Object, Object>> offsets) {
        None$ none$;
        if (offsets.isEmpty()) {
            none$ = None$.MODULE$;
        } else {
            String offsetsAsString = ((TraversableOnce)offsets.map((Function1 & Serializable & scala.Serializable)x0$1 -> {
                Tuple2 tuple2 = x0$1;
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                int partition = tuple2._1$mcI$sp();
                long partitionOffset = tuple2._2$mcJ$sp();
                String string = new StringBuilder(4).append("\"").append(partition).append("\": ").append(partitionOffset).toString();
                return string;
            }, List$.MODULE$.canBuildFrom())).mkString(",");
            none$ = new Some((Object)new StringBuilder(7).append("{\"").append(topicName).append("\":{").append(offsetsAsString).append("}}").toString());
        }
        return none$;
    }

    public Tuple2<Dataset<Row>, List<Tuple2<Object, Object>>> consumeTopicBatch(String topicConfigName, SparkSession session, Settings.KafkaTopicConfig config) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        BoxedUnit boxedUnit3;
        long EARLIEST_OFFSET = -2L;
        List startOffsets = (List)this.topicCurrentOffsets(topicConfigName).getOrElse((Function0 & Serializable & scala.Serializable)() -> (List)this.topicPartitions(config.topicName()).map((Function1 & Serializable & scala.Serializable)p -> new Tuple2.mcIJ.sp(p.partition(), EARLIEST_OFFSET), List$.MODULE$.canBuildFrom()));
        if (this.logger().underlying().isInfoEnabled()) {
            startOffsets.foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                KafkaClient.$anonfun$consumeTopicBatch$3(this, topicConfigName, x0$1);
                return BoxedUnit.UNIT;
            });
            boxedUnit3 = BoxedUnit.UNIT;
        } else {
            boxedUnit3 = BoxedUnit.UNIT;
        }
        List<Tuple2<Object, Object>> endOffsets = this.topicEndOffsets(config.topicName(), config.accessOptions());
        if (this.logger().underlying().isInfoEnabled()) {
            endOffsets.foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
                KafkaClient.$anonfun$consumeTopicBatch$4(this, topicConfigName, x0$2);
                return BoxedUnit.UNIT;
            });
            boxedUnit2 = BoxedUnit.UNIT;
        } else {
            boxedUnit2 = BoxedUnit.UNIT;
        }
        scala.collection.immutable.Map withOffsetsTopicOptions = config.accessOptions().$plus$plus((GenTraversableOnce)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"startingOffsets"), this.offsetsAsJson(config.topicName(), (List<Tuple2<Object, Object>>)startOffsets).getOrElse((Function0 & Serializable & scala.Serializable)() -> "earliest")), (List)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"endingOffsets"), this.offsetsAsJson(config.topicName(), endOffsets).getOrElse((Function0 & Serializable & scala.Serializable)() -> "latest")), (List)Nil$.MODULE$)));
        DataFrameReader reader = session.read().format("kafka");
        Dataset df = reader.options((Map)withOffsetsTopicOptions).load().selectExpr((Seq)config.fields().map((Function1 & Serializable & scala.Serializable)x -> new StringBuilder(6).append("CAST(").append((String)x).append(")").toString(), List$.MODULE$.canBuildFrom()));
        if (this.logger().underlying().isInfoEnabled()) {
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info(this.DatasetHelper(df).schemaString());
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        return new Tuple2((Object)df, endOffsets);
    }

    public Dataset<Row> consumeTopicStreaming(SparkSession session, Settings.KafkaTopicConfig config) {
        DataStreamReader reader = session.readStream().format("kafka");
        Dataset df = reader.options(config.accessOptions()).load().selectExpr((Seq)config.fields().map((Function1 & Serializable & scala.Serializable)x -> new StringBuilder(6).append("CAST(").append((String)x).append(")").toString(), List$.MODULE$.canBuildFrom()));
        return df;
    }

    public void sinkToTopic(Settings.KafkaTopicConfig config, Dataset<Row> df) {
        DataFrameWriter writer = df.selectExpr((Seq)config.fields().map((Function1 & Serializable & scala.Serializable)x -> new StringBuilder(6).append("CAST(").append((String)x).append(")").toString(), List$.MODULE$.canBuildFrom())).write().format("kafka");
        writer.options(config.accessOptions()).option("topic", config.topicName()).save();
    }

    public static final /* synthetic */ void $anonfun$topicEndOffsets$2(KafkaClient $this, Tuple2 x0$1) {
        BoxedUnit boxedUnit;
        Tuple2 tuple2 = x0$1;
        if (tuple2 != null) {
            String k = (String)tuple2._1();
            String v = (String)tuple2._2();
            if ($this.logger().underlying().isInfoEnabled()) {
                $this.logger().underlying().info("\t{}={}", new Object[]{k, v});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        } else {
            throw new MatchError((Object)tuple2);
        }
        BoxedUnit boxedUnit2 = boxedUnit;
    }

    public static final /* synthetic */ void $anonfun$consumeTopicBatch$3(KafkaClient $this, String topicConfigName$3, Tuple2 x0$1) {
        BoxedUnit boxedUnit;
        Tuple2 tuple2 = x0$1;
        if (tuple2 != null) {
            int partition = tuple2._1$mcI$sp();
            long offsetStart = tuple2._2$mcJ$sp();
            if ($this.logger().underlying().isInfoEnabled()) {
                $this.logger().underlying().info("{} start-offset -> {}:{}", new Object[]{topicConfigName$3, BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)offsetStart)});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        } else {
            throw new MatchError((Object)tuple2);
        }
        BoxedUnit boxedUnit2 = boxedUnit;
    }

    public static final /* synthetic */ void $anonfun$consumeTopicBatch$4(KafkaClient $this, String topicConfigName$3, Tuple2 x0$2) {
        BoxedUnit boxedUnit;
        Tuple2 tuple2 = x0$2;
        if (tuple2 != null) {
            int partition = tuple2._1$mcI$sp();
            long offsetEnd = tuple2._2$mcJ$sp();
            if ($this.logger().underlying().isInfoEnabled()) {
                $this.logger().underlying().info("{} end-offset -> {}:{}", new Object[]{topicConfigName$3, BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)offsetEnd)});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        } else {
            throw new MatchError((Object)tuple2);
        }
        BoxedUnit boxedUnit2 = boxedUnit;
    }

    public KafkaClient(Settings.KafkaConfig kafkaConfig, Settings settings) {
        BoxedUnit boxedUnit;
        this.settings = settings;
        StrictLogging.$init$((StrictLogging)this);
        DatasetLogging.$init$(this);
        this.cometOffsetsMode = (Mode)settings.comet().kafka().cometOffsetsMode().map((Function1 & Serializable & scala.Serializable)value -> Mode$.MODULE$.fromString((String)value)).getOrElse((Function0 & Serializable & scala.Serializable)() -> Mode$STREAM$.MODULE$);
        this.serverOptions = kafkaConfig.serverOptions();
        this.cometOffsetsConfig = (Settings.KafkaTopicConfig)kafkaConfig.topics().apply((Object)"comet_offsets");
        this.props = new Properties();
        this.serverOptions().foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            String k = (String)tuple2._1();
            String v = (String)tuple2._2();
            Object object = this.props().put(k, v);
            return object;
        });
        Mode mode = this.cometOffsetsMode();
        if (((Object)Mode$STREAM$.MODULE$).equals(mode)) {
            this.createTopicIfNotPresent(new NewTopic(this.cometOffsetsConfig().topicName(), this.cometOffsetsConfig().partitions(), this.cometOffsetsConfig().replicationFactor()), (scala.collection.immutable.Map<String, String>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"cleanup.policy"), (Object)"compact")}))));
            boxedUnit = BoxedUnit.UNIT;
        } else if (((Object)Mode$FILE$.MODULE$).equals(mode)) {
            boxedUnit = !settings.storageHandler().exists(new Path(this.cometOffsetsConfig().topicName())) ? BoxesRunTime.boxToBoolean((boolean)settings.storageHandler().mkdirs(new Path(this.cometOffsetsConfig().topicName()))) : BoxedUnit.UNIT;
        } else {
            throw new Exception("Should never happen");
        }
    }
}

