/*
 * Decompiled with CFR 0.152.
 */
package com.ebiznext.comet.utils.kafka;

import com.ebiznext.comet.config.Settings;
import com.ebiznext.comet.schema.generator.YamlSerializer$;
import com.ebiznext.comet.schema.model.Mode;
import com.ebiznext.comet.schema.model.Mode$;
import com.ebiznext.comet.schema.model.Mode$FILE$;
import com.ebiznext.comet.schema.model.Mode$STREAM$;
import com.ebiznext.comet.utils.FileLock;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Map$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\t%b\u0001B\u000f\u001f\u0001%B\u0001\u0002\u0011\u0001\u0003\u0002\u0003\u0006I!\u0011\u0005\t+\u0002\u0011\t\u0011)A\u0006-\")!\f\u0001C\u00017\"9\u0011\r\u0001b\u0001\n\u0003\u0011\u0007BB6\u0001A\u0003%1\rC\u0004m\u0001\t\u0007I\u0011A7\t\re\u0004\u0001\u0015!\u0003o\u0011\u001dQ\bA1A\u0005\u0002mDaa \u0001!\u0002\u0013a\b\"CA\u0001\u0001\t\u0007I\u0011AA\u0002\u0011!\t\t\u0002\u0001Q\u0001\n\u0005\u0015\u0001\"CA\n\u0001\t\u0007I\u0011AA\u000b\u0011!\t\t\u0004\u0001Q\u0001\n\u0005]\u0001bBA\u001a\u0001\u0011\u0005\u0011Q\u0007\u0005\b\u0003{\u0001A\u0011AA \u0011\u001d\t)\u0005\u0001C\u0001\u0003\u000fBq!a\u0016\u0001\t\u0003\tI\u0006C\u0004\u0002|\u0001!\t!! \t\u000f\u0005e\u0005\u0001\"\u0003\u0002\u001c\"9\u0011q\u0014\u0001\u0005\u0002\u0005\u0005\u0006bBAW\u0001\u0011%\u0011q\u0016\u0005\b\u0003s\u0003A\u0011BA^\u0011\u001d\t9\r\u0001C\u0005\u0003\u0013Dq!!4\u0001\t\u0003\ty\rC\u0004\u0002T\u0002!\t!!6\t\u000f\u0005u\u0007\u0001\"\u0001\u0002`\"9!q\u0003\u0001\u0005\u0002\te\u0001b\u0002B\u0010\u0001\u0011\u0005!\u0011\u0005\u0002\f\u0017\u000647.Y\"mS\u0016tGO\u0003\u0002 A\u0005)1.\u00194lC*\u0011\u0011EI\u0001\u0006kRLGn\u001d\u0006\u0003G\u0011\nQaY8nKRT!!\n\u0014\u0002\u0011\u0015\u0014\u0017N\u001f8fqRT\u0011aJ\u0001\u0004G>l7\u0001A\n\u0005\u0001)\u0002\u0004\b\u0005\u0002,]5\tAFC\u0001.\u0003\u0015\u00198-\u00197b\u0013\tyCF\u0001\u0004B]f\u0014VM\u001a\t\u0003cYj\u0011A\r\u0006\u0003gQ\nAb]2bY\u0006dwnZ4j]\u001eT!!\u000e\u0014\u0002\u0011QL\b/Z:bM\u0016L!a\u000e\u001a\u0003\u001bM#(/[2u\u0019><w-\u001b8h!\tId(D\u0001;\u0015\tYD(\u0001\u0003mC:<'\"A\u001f\u0002\t)\fg/Y\u0005\u0003\u007fi\u0012Q\"Q;u_\u000ecwn]3bE2,\u0017aC6bM.\f7i\u001c8gS\u001e\u0004\"A\u0011*\u000f\u0005\r{eB\u0001#N\u001d\t)EJ\u0004\u0002G\u0017:\u0011qIS\u0007\u0002\u0011*\u0011\u0011\nK\u0001\u0007yI|w\u000e\u001e \n\u0003\u001dJ!!\n\u0014\n\u0005\r\"\u0013B\u0001(#\u0003\u0019\u0019wN\u001c4jO&\u0011\u0001+U\u0001\t'\u0016$H/\u001b8hg*\u0011aJI\u0005\u0003'R\u00131bS1gW\u0006\u001cuN\u001c4jO*\u0011\u0001+U\u0001\tg\u0016$H/\u001b8hgB\u0011q\u000bW\u0007\u0002#&\u0011\u0011,\u0015\u0002\t'\u0016$H/\u001b8hg\u00061A(\u001b8jiz\"\"\u0001\u00181\u0015\u0005u{\u0006C\u00010\u0001\u001b\u0005q\u0002\"B+\u0004\u0001\b1\u0006\"\u0002!\u0004\u0001\u0004\t\u0015\u0001E2p[\u0016$xJ\u001a4tKR\u001cXj\u001c3f+\u0005\u0019\u0007C\u00013j\u001b\u0005)'B\u00014h\u0003\u0015iw\u000eZ3m\u0015\tA'%\u0001\u0004tG\",W.Y\u0005\u0003U\u0016\u0014A!T8eK\u0006\t2m\\7fi>3gm]3ug6{G-\u001a\u0011\u0002\u001bM,'O^3s\u001fB$\u0018n\u001c8t+\u0005q\u0007\u0003B8tmZt!\u0001]9\u0011\u0005\u001dc\u0013B\u0001:-\u0003\u0019\u0001&/\u001a3fM&\u0011A/\u001e\u0002\u0004\u001b\u0006\u0004(B\u0001:-!\tyw/\u0003\u0002yk\n11\u000b\u001e:j]\u001e\fab]3sm\u0016\u0014x\n\u001d;j_:\u001c\b%\u0001\nd_6,Go\u00144gg\u0016$8oQ8oM&<W#\u0001?\u0011\u0005\tk\u0018B\u0001@U\u0005AY\u0015MZ6b)>\u0004\u0018nY\"p]\u001aLw-A\nd_6,Go\u00144gg\u0016$8oQ8oM&<\u0007%A\u0003qe>\u00048/\u0006\u0002\u0002\u0006A!\u0011qAA\u0007\u001b\t\tIAC\u0002\u0002\fq\nA!\u001e;jY&!\u0011qBA\u0005\u0005)\u0001&o\u001c9feRLWm]\u0001\u0007aJ|\u0007o\u001d\u0011\u0002\r\rd\u0017.\u001a8u+\t\t9\u0002\u0005\u0003\u0002\u001a\u00055RBAA\u000e\u0015\u0011\ti\"a\b\u0002\u000b\u0005$W.\u001b8\u000b\t\u0005\u0005\u00121E\u0001\bG2LWM\u001c;t\u0015\ry\u0012Q\u0005\u0006\u0005\u0003O\tI#\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0003\u0003W\t1a\u001c:h\u0013\u0011\ty#a\u0007\u0003\u0017\u0005#W.\u001b8DY&,g\u000e^\u0001\bG2LWM\u001c;!\u0003\u0015\u0019Gn\\:f)\t\t9\u0004E\u0002,\u0003sI1!a\u000f-\u0005\u0011)f.\u001b;\u0002\u0017\u0011,G.\u001a;f)>\u0004\u0018n\u0019\u000b\u0005\u0003o\t\t\u0005\u0003\u0004\u0002D=\u0001\rA^\u0001\ni>\u0004\u0018n\u0019(b[\u0016\fqc\u0019:fCR,Gk\u001c9jG&3gj\u001c;Qe\u0016\u001cXM\u001c;\u0015\r\u0005]\u0012\u0011JA*\u0011\u001d\tY\u0005\u0005a\u0001\u0003\u001b\nQ\u0001^8qS\u000e\u0004B!!\u0007\u0002P%!\u0011\u0011KA\u000e\u0005!qUm\u001e+pa&\u001c\u0007BBA+!\u0001\u0007a.\u0001\u0003d_:4\u0017a\u0004;pa&\u001c\u0007+\u0019:uSRLwN\\:\u0015\t\u0005m\u0013\u0011\u0010\t\u0007\u0003;\n9'!\u001c\u000f\t\u0005}\u00131\r\b\u0004\u000f\u0006\u0005\u0014\"A\u0017\n\u0007\u0005\u0015D&A\u0004qC\u000e\\\u0017mZ3\n\t\u0005%\u00141\u000e\u0002\u0005\u0019&\u001cHOC\u0002\u0002f1\u0002B!a\u001c\u0002v5\u0011\u0011\u0011\u000f\u0006\u0005\u0003g\n\u0019#\u0001\u0004d_6lwN\\\u0005\u0005\u0003o\n\tH\u0001\nU_BL7\rU1si&$\u0018n\u001c8J]\u001a|\u0007BBA\"#\u0001\u0007a/A\bu_BL7-\u00128e\u001f\u001a47/\u001a;t)\u0019\ty(a%\u0002\u0016B1\u0011QLA4\u0003\u0003\u0003raKAB\u0003\u000f\u000bi)C\u0002\u0002\u00062\u0012a\u0001V;qY\u0016\u0014\u0004cA\u0016\u0002\n&\u0019\u00111\u0012\u0017\u0003\u0007%sG\u000fE\u0002,\u0003\u001fK1!!%-\u0005\u0011auN\\4\t\r\u0005\r#\u00031\u0001w\u0011\u0019\t9J\u0005a\u0001]\u0006i\u0011mY2fgN|\u0005\u000f^5p]N\f!BY;jY\u0012\u0004&o\u001c9t)\u0011\t)!!(\t\r\u0005]5\u00031\u0001o\u0003A!x\u000e]5d'\u00064Xm\u00144gg\u0016$8\u000f\u0006\u0005\u00028\u0005\r\u0016qUAU\u0011\u0019\t)\u000b\u0006a\u0001m\u0006yAo\u001c9jG\u000e{gNZ5h\u001d\u0006lW\r\u0003\u0004\u0002\u0018R\u0001\rA\u001c\u0005\b\u0003W#\u0002\u0019AA@\u0003\u001dygMZ:fiN\fQ\u0004^8qS\u000e\u001cUO\u001d:f]R|eMZ:fiN4%o\\7TiJ,\u0017-\u001c\u000b\u0005\u0003c\u000b9\fE\u0003,\u0003g\u000by(C\u0002\u000262\u0012aa\u00149uS>t\u0007BBAS+\u0001\u0007a/\u0001\td_6,Go\u00144gg\u0016$8\u000fT8dWR!\u0011QXAc!\u0011\ty,!1\u000e\u0003\u0001J1!a1!\u0005!1\u0015\u000e\\3M_\u000e\\\u0007BBAS-\u0001\u0007a/A\u000eu_BL7mQ;se\u0016tGo\u00144gg\u0016$8O\u0012:p[\u001aKG.\u001a\u000b\u0005\u0003c\u000bY\r\u0003\u0004\u0002&^\u0001\rA^\u0001\u0014i>\u0004\u0018nY\"veJ,g\u000e^(gMN,Go\u001d\u000b\u0005\u0003c\u000b\t\u000e\u0003\u0004\u0002&b\u0001\rA^\u0001\u000e_\u001a47/\u001a;t\u0003NT5o\u001c8\u0015\r\u0005]\u0017\u0011\\An!\u0011Y\u00131\u0017<\t\r\u0005\r\u0013\u00041\u0001w\u0011\u001d\tY+\u0007a\u0001\u0003\u007f\n\u0011cY8ogVlW\rV8qS\u000e\u0014\u0015\r^2i)!\t\tOa\u0002\u0003\n\tU\u0001cB\u0016\u0002\u0004\u0006\r\u0018q\u0010\t\u0005\u0003K\u0014\tA\u0004\u0003\u0002h\u0006uh\u0002BAu\u0003otA!a;\u0002t:!\u0011Q^Ay\u001d\r9\u0015q^\u0005\u0003\u0003WIA!a\n\u0002*%!\u0011Q_A\u0013\u0003\u0015\u0019\b/\u0019:l\u0013\u0011\tI0a?\u0002\u0007M\fHN\u0003\u0003\u0002v\u0006\u0015\u0012\u0002BA3\u0003\u007fTA!!?\u0002|&!!1\u0001B\u0003\u0005%!\u0015\r^1Ge\u0006lWM\u0003\u0003\u0002f\u0005}\bBBAS5\u0001\u0007a\u000fC\u0004\u0003\fi\u0001\rA!\u0004\u0002\u000fM,7o]5p]B!!q\u0002B\t\u001b\t\ty0\u0003\u0003\u0003\u0014\u0005}(\u0001D*qCJ\\7+Z:tS>t\u0007\"\u0002(\u001b\u0001\u0004a\u0018!F2p]N,X.\u001a+pa&\u001c7\u000b\u001e:fC6Lgn\u001a\u000b\u0007\u0003G\u0014YB!\b\t\u000f\t-1\u00041\u0001\u0003\u000e!)aj\u0007a\u0001y\u0006Y1/\u001b8l)>$v\u000e]5d)\u0019\t9Da\t\u0003&!)a\n\ba\u0001y\"9!q\u0005\u000fA\u0002\u0005\r\u0018A\u00013g\u0001")
public class KafkaClient
implements StrictLogging,
AutoCloseable {
    private final Settings settings;
    private final Mode cometOffsetsMode;
    private final scala.collection.immutable.Map<String, String> serverOptions;
    private final Settings.KafkaTopicConfig cometOffsetsConfig;
    private final Properties props;
    private final AdminClient client;
    private final Logger logger;

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    public Mode cometOffsetsMode() {
        return this.cometOffsetsMode;
    }

    public scala.collection.immutable.Map<String, String> serverOptions() {
        return this.serverOptions;
    }

    public Settings.KafkaTopicConfig cometOffsetsConfig() {
        return this.cometOffsetsConfig;
    }

    public Properties props() {
        return this.props;
    }

    public AdminClient client() {
        return this.client;
    }

    @Override
    public void close() {
        this.client().close();
    }

    public void deleteTopic(String topicName) {
        block0: {
            boolean found = ((Set)this.client().listTopics().names().get()).contains(topicName);
            ((TraversableOnce)JavaConverters$.MODULE$.asScalaSetConverter((Set)this.client().listTopics().names().get()).asScala()).toSet().foreach((Function1 & Serializable & scala.Serializable)x -> {
                Predef$.MODULE$.println(x);
                return BoxedUnit.UNIT;
            });
            if (!found) break block0;
            this.client().deleteTopics(JavaConverters$.MODULE$.asJavaCollectionConverter((Iterable)new .colon.colon((Object)topicName, (List)Nil$.MODULE$)).asJavaCollection());
        }
    }

    public void createTopicIfNotPresent(NewTopic topic, scala.collection.immutable.Map<String, String> conf) {
        block0: {
            boolean found = ((Set)this.client().listTopics().names().get()).contains(topic.name());
            if (found) break block0;
            this.client().createTopics(Collections.singleton(topic.configs((Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(conf).asJava()))).all().get();
        }
    }

    public List<TopicPartitionInfo> topicPartitions(String topicName) {
        return ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(((TopicDescription)((Map)this.client().describeTopics(Collections.singleton(topicName)).all().get()).get(topicName)).partitions()).asScala()).toList();
    }

    public List<Tuple2<Object, Object>> topicEndOffsets(String topicName, scala.collection.immutable.Map<String, String> accessOptions) {
        Properties props = this.buildProps(accessOptions);
        KafkaConsumer consumer = new KafkaConsumer(props);
        List partitions = ((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(consumer.partitionsFor(topicName)).asScala()).map((Function1 & Serializable & scala.Serializable)info -> new TopicPartition(topicName, info.partition()), Buffer$.MODULE$.canBuildFrom())).toList();
        consumer.assign((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)partitions).asJava());
        consumer.seekToEnd((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)partitions).asJava());
        return (List)partitions.map((Function1 & Serializable & scala.Serializable)p -> new Tuple2.mcIJ.sp(p.partition(), consumer.position(p)), List$.MODULE$.canBuildFrom());
    }

    /*
     * WARNING - void declaration
     */
    private Properties buildProps(scala.collection.immutable.Map<String, String> accessOptions) {
        void var2_2;
        Properties props = new Properties();
        accessOptions.foreach((Function1 & Serializable & scala.Serializable)option -> props.put(option._1(), option._2()));
        return var2_2;
    }

    public void topicSaveOffsets(String topicConfigName, scala.collection.immutable.Map<String, String> accessOptions, List<Tuple2<Object, Object>> offsets) {
        Mode mode = this.cometOffsetsMode();
        if (((Object)Mode$STREAM$.MODULE$).equals(mode)) {
            Properties props = this.buildProps(accessOptions);
            KafkaProducer producer = new KafkaProducer(props);
            offsets.foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                Tuple2 tuple2 = x0$1;
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                int partition = tuple2._1$mcI$sp();
                long offset = tuple2._2$mcJ$sp();
                Future future = producer.send(new ProducerRecord(this.cometOffsetsConfig().topicName(), (Object)new StringBuilder(1).append(topicConfigName).append("/").append(partition).toString(), (Object)String.valueOf(BoxesRunTime.boxToLong((long)offset))));
                return future;
            });
            producer.close();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (((Object)Mode$FILE$.MODULE$).equals(mode)) {
            FileLock qual$1 = this.cometOffsetsLock(topicConfigName);
            long x$1 = qual$1.doExclusively$default$1();
            JFunction0.mcV.sp & Serializable & scala.Serializable x$2 = (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                BoxedUnit boxedUnit;
                Path cometOffsetsPath = new Path(this.cometOffsetsConfig().topicName(), topicConfigName);
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Saving comet offsets to path {}", new Object[]{cometOffsetsPath});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                $this.settings.storageHandler().write(YamlSerializer$.MODULE$.serializeObject(offsets.map((Function1 & Serializable & scala.Serializable)x0$2 -> {
                    Tuple2 tuple2 = x0$2;
                    if (tuple2 == null) {
                        throw new MatchError((Object)tuple2);
                    }
                    int partition = tuple2._1$mcI$sp();
                    long offset = tuple2._2$mcJ$sp();
                    String string = new StringBuilder(1).append(((Object)BoxesRunTime.boxToInteger((int)partition)).toString()).append(",").append(((Object)BoxesRunTime.boxToLong((long)offset)).toString()).toString();
                    return string;
                }, List$.MODULE$.canBuildFrom())), cometOffsetsPath);
            };
            BoxedUnit boxedUnit = (BoxedUnit)qual$1.doExclusively(x$1, x$2);
        } else {
            throw new Exception("Should never happen");
        }
    }

    private Option<List<Tuple2<Object, Object>>> topicCurrentOffsetsFromStream(String topicConfigName) {
        Properties props = new Properties();
        this.cometOffsetsConfig().accessOptions().foreach((Function1 & Serializable & scala.Serializable)option -> props.put(option._1(), option._2()));
        KafkaConsumer consumer = new KafkaConsumer(props);
        List partitions = (List)this.topicPartitions(this.cometOffsetsConfig().topicName()).map((Function1 & Serializable & scala.Serializable)info -> new TopicPartition(this.cometOffsetsConfig().topicName(), info.partition()), List$.MODULE$.canBuildFrom());
        consumer.assign((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)partitions).asJava());
        consumer.seekToBeginning((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)partitions).asJava());
        scala.collection.mutable.Map offsets = Map$.MODULE$.empty();
        ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
        while (records != null && !records.isEmpty()) {
            ((IterableLike)JavaConverters$.MODULE$.iterableAsScalaIterableConverter(records.records(this.cometOffsetsConfig().topicName())).asScala()).foreach((Function1 & Serializable & scala.Serializable)r -> (scala.collection.mutable.Map)offsets.$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(r.key()), r.value())));
            records = consumer.poll(Duration.ofMillis(100L));
        }
        Option res = ((TraversableLike)offsets.keys().map((Function1 & Serializable & scala.Serializable)k -> {
            String[] tab = new StringOps(Predef$.MODULE$.augmentString(k)).split('/');
            return new Tuple3((Object)tab[0], (Object)tab[1], offsets.apply(k));
        }, Iterable$.MODULE$.canBuildFrom())).groupBy((Function1 & Serializable & scala.Serializable)x0$1 -> {
            String topic;
            Tuple3 tuple3 = x0$1;
            if (tuple3 == null) {
                throw new MatchError((Object)tuple3);
            }
            String string = topic = (String)tuple3._1();
            return string;
        }).mapValues((Function1 & Serializable & scala.Serializable)x$1 -> ((TraversableOnce)x$1.map((Function1 & Serializable & scala.Serializable)x0$2 -> {
            Tuple3 tuple3 = x0$2;
            if (tuple3 == null) {
                throw new MatchError((Object)tuple3);
            }
            String partition = (String)tuple3._2();
            String offset = (String)tuple3._3();
            Tuple2.mcIJ.sp sp2 = new Tuple2.mcIJ.sp(new StringOps(Predef$.MODULE$.augmentString(partition)).toInt(), new StringOps(Predef$.MODULE$.augmentString(offset)).toLong());
            return sp2;
        }, Iterable$.MODULE$.canBuildFrom())).toList()).get((Object)topicConfigName);
        return res;
    }

    private FileLock cometOffsetsLock(String topicConfigName) {
        Path lockPath = new Path(this.settings.comet().lock().path(), new StringBuilder(19).append("comet_offsets_").append(topicConfigName).append(".lock").toString());
        return new FileLock(lockPath, this.settings.storageHandler());
    }

    private Option<List<Tuple2<Object, Object>>> topicCurrentOffsetsFromFile(String topicConfigName) {
        FileLock qual$1 = this.cometOffsetsLock(topicConfigName);
        long x$1 = qual$1.doExclusively$default$1();
        Function0 & Serializable & scala.Serializable x$2 = (Function0 & Serializable & scala.Serializable)() -> {
            None$ none$;
            Path cometOffsetsPath = new Path(this.cometOffsetsConfig().topicName(), topicConfigName);
            boolean bl = $this.settings.storageHandler().exists(cometOffsetsPath);
            if (!bl) {
                BoxedUnit boxedUnit;
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Cannot load comet offsets: {} file does not exist", new Object[]{cometOffsetsPath});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                none$ = None$.MODULE$;
            } else if (bl) {
                BoxedUnit boxedUnit;
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Loading comet offsets to path {}", new Object[]{cometOffsetsPath});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                List res = (List)((List)YamlSerializer$.MODULE$.mapper().readValue($this.settings.storageHandler().read(cometOffsetsPath), List.class)).map((Function1 & Serializable & scala.Serializable)str -> {
                    String[] tab = new StringOps(Predef$.MODULE$.augmentString(str)).split(',');
                    return new Tuple2.mcIJ.sp(new StringOps(Predef$.MODULE$.augmentString(tab[0])).toInt(), new StringOps(Predef$.MODULE$.augmentString(tab[1])).toLong());
                }, List$.MODULE$.canBuildFrom());
                none$ = new Some((Object)res);
            } else {
                throw new MatchError((Object)BoxesRunTime.boxToBoolean((boolean)bl));
            }
            return none$;
        };
        return (Option)qual$1.doExclusively(x$1, x$2);
    }

    public Option<List<Tuple2<Object, Object>>> topicCurrentOffsets(String topicConfigName) {
        Option<List<Tuple2<Object, Object>>> option;
        Mode mode = this.cometOffsetsMode();
        if (((Object)Mode$STREAM$.MODULE$).equals(mode)) {
            option = this.topicCurrentOffsetsFromStream(topicConfigName);
        } else if (((Object)Mode$FILE$.MODULE$).equals(mode)) {
            option = this.topicCurrentOffsetsFromFile(topicConfigName);
        } else {
            throw new Exception("Should never happen");
        }
        return option;
    }

    public Option<String> offsetsAsJson(String topicName, List<Tuple2<Object, Object>> offsets) {
        None$ none$;
        if (offsets.isEmpty()) {
            none$ = None$.MODULE$;
        } else {
            String offsetsAsString = ((TraversableOnce)offsets.map((Function1 & Serializable & scala.Serializable)x0$1 -> {
                Tuple2 tuple2 = x0$1;
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                int partition = tuple2._1$mcI$sp();
                long partitionOffset = tuple2._2$mcJ$sp();
                String string = new StringBuilder(4).append("\"").append(partition).append("\": ").append(partitionOffset).toString();
                return string;
            }, List$.MODULE$.canBuildFrom())).mkString(",");
            none$ = new Some((Object)new StringBuilder(7).append("{\"").append(topicName).append("\":{").append(offsetsAsString).append("}}").toString());
        }
        return none$;
    }

    public Tuple2<Dataset<Row>, List<Tuple2<Object, Object>>> consumeTopicBatch(String topicConfigName, SparkSession session, Settings.KafkaTopicConfig config) {
        BoxedUnit boxedUnit;
        long EARLIEST_OFFSET = -2L;
        List startOffsets = (List)this.topicCurrentOffsets(topicConfigName).getOrElse((Function0 & Serializable & scala.Serializable)() -> (List)this.topicPartitions(config.topicName()).map((Function1 & Serializable & scala.Serializable)p -> new Tuple2.mcIJ.sp(p.partition(), EARLIEST_OFFSET), List$.MODULE$.canBuildFrom()));
        List<Tuple2<Object, Object>> endOffsets = this.topicEndOffsets(config.topicName(), config.accessOptions());
        scala.collection.immutable.Map withOffsetsTopicOptions = config.accessOptions().$plus$plus((GenTraversableOnce)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"startingOffsets"), this.offsetsAsJson(config.topicName(), (List<Tuple2<Object, Object>>)startOffsets).getOrElse((Function0 & Serializable & scala.Serializable)() -> "earliest")), (List)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"endingOffsets"), this.offsetsAsJson(config.topicName(), endOffsets).getOrElse((Function0 & Serializable & scala.Serializable)() -> "latest")), (List)Nil$.MODULE$)));
        DataFrameReader reader = session.read().format("kafka");
        Dataset df = ((DataFrameReader)withOffsetsTopicOptions.foldLeft((Object)reader, (Function2 & Serializable & scala.Serializable)(x0$1, x1$1) -> {
            Tuple2 tuple2;
            DataFrameReader reader;
            block3: {
                Tuple2 tuple22;
                block2: {
                    tuple22 = new Tuple2(x0$1, x1$1);
                    if (tuple22 == null) break block2;
                    reader = (DataFrameReader)tuple22._1();
                    tuple2 = (Tuple2)tuple22._2();
                    if (tuple2 != null) break block3;
                }
                throw new MatchError((Object)tuple22);
            }
            String k = (String)tuple2._1();
            String v = (String)tuple2._2();
            DataFrameReader dataFrameReader = reader.option(k, v);
            return dataFrameReader;
        })).load().selectExpr((Seq)config.fields().map((Function1 & Serializable & scala.Serializable)x -> new StringBuilder(6).append("CAST(").append((String)x).append(")").toString(), List$.MODULE$.canBuildFrom()));
        if (this.logger().underlying().isInfoEnabled()) {
            df.printSchema();
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        return new Tuple2((Object)df, endOffsets);
    }

    public Dataset<Row> consumeTopicStreaming(SparkSession session, Settings.KafkaTopicConfig config) {
        DataStreamReader reader = session.readStream().format("kafka");
        Dataset df = ((DataStreamReader)config.accessOptions().foldLeft((Object)reader, (Function2 & Serializable & scala.Serializable)(x0$1, x1$1) -> {
            Tuple2 tuple2;
            DataStreamReader reader;
            block3: {
                Tuple2 tuple22;
                block2: {
                    tuple22 = new Tuple2(x0$1, x1$1);
                    if (tuple22 == null) break block2;
                    reader = (DataStreamReader)tuple22._1();
                    tuple2 = (Tuple2)tuple22._2();
                    if (tuple2 != null) break block3;
                }
                throw new MatchError((Object)tuple22);
            }
            String k = (String)tuple2._1();
            String v = (String)tuple2._2();
            DataStreamReader dataStreamReader = reader.option(k, v);
            return dataStreamReader;
        })).load().selectExpr((Seq)config.fields().map((Function1 & Serializable & scala.Serializable)x -> new StringBuilder(6).append("CAST(").append((String)x).append(")").toString(), List$.MODULE$.canBuildFrom()));
        return df;
    }

    public void sinkToTopic(Settings.KafkaTopicConfig config, Dataset<Row> df) {
        DataFrameWriter writer = df.selectExpr((Seq)config.fields().map((Function1 & Serializable & scala.Serializable)x -> new StringBuilder(6).append("CAST(").append((String)x).append(")").toString(), List$.MODULE$.canBuildFrom())).write().format("kafka");
        ((DataFrameWriter)config.accessOptions().foldLeft((Object)writer, (Function2 & Serializable & scala.Serializable)(x0$1, x1$1) -> {
            Tuple2 tuple2;
            DataFrameWriter writer;
            block3: {
                Tuple2 tuple22;
                block2: {
                    tuple22 = new Tuple2(x0$1, x1$1);
                    if (tuple22 == null) break block2;
                    writer = (DataFrameWriter)tuple22._1();
                    tuple2 = (Tuple2)tuple22._2();
                    if (tuple2 != null) break block3;
                }
                throw new MatchError((Object)tuple22);
            }
            String k = (String)tuple2._1();
            String v = (String)tuple2._2();
            DataFrameWriter dataFrameWriter = writer.option(k, v);
            return dataFrameWriter;
        })).option("topic", config.topicName()).save();
    }

    public KafkaClient(Settings.KafkaConfig kafkaConfig, Settings settings) {
        BoxedUnit boxedUnit;
        this.settings = settings;
        StrictLogging.$init$((StrictLogging)this);
        this.cometOffsetsMode = (Mode)settings.comet().kafka().cometOffsetsMode().map((Function1 & Serializable & scala.Serializable)value -> Mode$.MODULE$.fromString((String)value)).getOrElse((Function0 & Serializable & scala.Serializable)() -> Mode$STREAM$.MODULE$);
        this.serverOptions = kafkaConfig.serverOptions();
        this.cometOffsetsConfig = (Settings.KafkaTopicConfig)kafkaConfig.topics().apply((Object)"comet_offsets");
        this.props = new Properties();
        this.serverOptions().foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            String k = (String)tuple2._1();
            String v = (String)tuple2._2();
            Object object = this.props().put(k, v);
            return object;
        });
        this.client = AdminClient.create((Properties)this.props());
        Mode mode = this.cometOffsetsMode();
        if (((Object)Mode$STREAM$.MODULE$).equals(mode)) {
            this.createTopicIfNotPresent(new NewTopic(this.cometOffsetsConfig().topicName(), this.cometOffsetsConfig().partitions(), this.cometOffsetsConfig().replicationFactor()), (scala.collection.immutable.Map<String, String>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"cleanup.policy"), (Object)"compact")}))));
            boxedUnit = BoxedUnit.UNIT;
        } else if (((Object)Mode$FILE$.MODULE$).equals(mode)) {
            boxedUnit = !settings.storageHandler().exists(new Path(this.cometOffsetsConfig().topicName())) ? BoxesRunTime.boxToBoolean((boolean)settings.storageHandler().mkdirs(new Path(this.cometOffsetsConfig().topicName()))) : BoxedUnit.UNIT;
        } else {
            throw new Exception("Should never happen");
        }
    }
}

