/*
 * Decompiled with CFR 0.152.
 */
package ai.starlake.utils.kafka;

import ai.starlake.config.Settings;
import ai.starlake.schema.model.Mode;
import ai.starlake.schema.model.Mode$;
import ai.starlake.schema.model.Mode$FILE$;
import ai.starlake.schema.model.Mode$STREAM$;
import ai.starlake.utils.FileLock;
import ai.starlake.utils.YamlSerializer$;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.DatasetLogging;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Map$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;

@ScalaSignature(bytes="\u0006\u0001\tEc\u0001\u0002\u0010 \u0001!B\u0001\"\u0014\u0001\u0003\u0002\u0003\u0006IA\u0014\u0005\tA\u0002\u0011\t\u0011)A\u0006C\")Q\r\u0001C\u0001M\"9A\u000e\u0001b\u0001\n\u0003i\u0007B\u0002<\u0001A\u0003%a\u000eC\u0004x\u0001\t\u0007I\u0011\u0001=\t\u000f\u0005%\u0001\u0001)A\u0005s\"I\u00111\u0002\u0001C\u0002\u0013\u0005\u0011Q\u0002\u0005\t\u0003+\u0001\u0001\u0015!\u0003\u0002\u0010!I\u0011q\u0003\u0001C\u0002\u0013\u0005\u0011\u0011\u0004\u0005\t\u0003O\u0001\u0001\u0015!\u0003\u0002\u001c!Q\u0011\u0011\u0006\u0001\t\u0006\u0004%\t!a\u000b\t\u000f\u0005}\u0002\u0001\"\u0001\u0002B!9\u0011\u0011\n\u0001\u0005\u0002\u0005-\u0003bBA)\u0001\u0011\u0005\u00111\u000b\u0005\b\u0003G\u0002A\u0011AA3\u0011\u001d\t\t\n\u0001C\u0005\u0003'Cq!!.\u0001\t\u0013\t9\fC\u0004\u0002>\u0002!I!a0\t\u000f\u0005\r\u0007\u0001\"\u0001\u0002F\"9\u0011\u0011\u001b\u0001\u0005\u0002\u0005M\u0007bBAp\u0001\u0011%\u0011\u0011\u001d\u0005\b\u0003W\u0004A\u0011BAw\u0011\u001d\tI\u0010\u0001C\u0005\u0003wDq!a@\u0001\t\u0003\u0011\t\u0001C\u0004\u0003\u0006\u0001!\tAa\u0002\t\u000f\t=\u0001\u0001\"\u0001\u0003\u0012!9!q\b\u0001\u0005\u0002\t\u0005\u0003b\u0002B$\u0001\u0011\u0005!\u0011\n\u0002\f\u0017\u000647.Y\"mS\u0016tGO\u0003\u0002!C\u0005)1.\u00194lC*\u0011!eI\u0001\u0006kRLGn\u001d\u0006\u0003I\u0015\n\u0001b\u001d;be2\f7.\u001a\u0006\u0002M\u0005\u0011\u0011-[\u0002\u0001'\u0015\u0001\u0011fL\u001dF!\tQS&D\u0001,\u0015\u0005a\u0013!B:dC2\f\u0017B\u0001\u0018,\u0005\u0019\te.\u001f*fMB\u0011\u0001gN\u0007\u0002c)\u0011!gM\u0001\rg\u000e\fG.\u00197pO\u001eLgn\u001a\u0006\u0003iU\n\u0001\u0002^=qKN\fg-\u001a\u0006\u0002m\u0005\u00191m\\7\n\u0005a\n$!D*ue&\u001cG\u000fT8hO&tw\r\u0005\u0002;\u00076\t1H\u0003\u0002={\u0005\u00191/\u001d7\u000b\u0005yz\u0014!B:qCJ\\'B\u0001!B\u0003\u0019\t\u0007/Y2iK*\t!)A\u0002pe\u001eL!\u0001R\u001e\u0003\u001d\u0011\u000bG/Y:fi2{wmZ5oOB\u0011aiS\u0007\u0002\u000f*\u0011\u0001*S\u0001\u0005Y\u0006twMC\u0001K\u0003\u0011Q\u0017M^1\n\u00051;%!D!vi>\u001cEn\\:fC\ndW-A\u0006lC\u001a\\\u0017mQ8oM&<\u0007CA(^\u001d\t\u0001&L\u0004\u0002R1:\u0011!k\u0016\b\u0003'Zk\u0011\u0001\u0016\u0006\u0003+\u001e\na\u0001\u0010:p_Rt\u0014\"\u0001\u0014\n\u0005\u0011*\u0013BA-$\u0003\u0019\u0019wN\u001c4jO&\u00111\fX\u0001\t'\u0016$H/\u001b8hg*\u0011\u0011lI\u0005\u0003=~\u00131bS1gW\u0006\u001cuN\u001c4jO*\u00111\fX\u0001\tg\u0016$H/\u001b8hgB\u0011!mY\u0007\u00029&\u0011A\r\u0018\u0002\t'\u0016$H/\u001b8hg\u00061A(\u001b8jiz\"\"aZ6\u0015\u0005!T\u0007CA5\u0001\u001b\u0005y\u0002\"\u00021\u0004\u0001\b\t\u0007\"B'\u0004\u0001\u0004q\u0015\u0001E2p[\u0016$xJ\u001a4tKR\u001cXj\u001c3f+\u0005q\u0007CA8u\u001b\u0005\u0001(BA9s\u0003\u0015iw\u000eZ3m\u0015\t\u00198%\u0001\u0004tG\",W.Y\u0005\u0003kB\u0014A!T8eK\u0006\t2m\\7fi>3gm]3ug6{G-\u001a\u0011\u0002\u001bM,'O^3s\u001fB$\u0018n\u001c8t+\u0005I\bC\u0002>\u007f\u0003\u0007\t\u0019A\u0004\u0002|yB\u00111kK\u0005\u0003{.\na\u0001\u0015:fI\u00164\u0017bA@\u0002\u0002\t\u0019Q*\u00199\u000b\u0005u\\\u0003c\u0001>\u0002\u0006%!\u0011qAA\u0001\u0005\u0019\u0019FO]5oO\u0006q1/\u001a:wKJ|\u0005\u000f^5p]N\u0004\u0013AE2p[\u0016$xJ\u001a4tKR\u001c8i\u001c8gS\u001e,\"!a\u0004\u0011\u0007=\u000b\t\"C\u0002\u0002\u0014}\u0013\u0001cS1gW\u0006$v\u000e]5d\u0007>tg-[4\u0002'\r|W.\u001a;PM\u001a\u001cX\r^:D_:4\u0017n\u001a\u0011\u0002\u000bA\u0014x\u000e]:\u0016\u0005\u0005m\u0001\u0003BA\u000f\u0003Gi!!a\b\u000b\u0007\u0005\u0005\u0012*\u0001\u0003vi&d\u0017\u0002BA\u0013\u0003?\u0011!\u0002\u0015:pa\u0016\u0014H/[3t\u0003\u0019\u0001(o\u001c9tA\u000511\r\\5f]R,\"!!\f\u0011\t\u0005=\u00121H\u0007\u0003\u0003cQA!a\r\u00026\u0005)\u0011\rZ7j]*!\u0011qGA\u001d\u0003\u001d\u0019G.[3oiNT!\u0001I \n\t\u0005u\u0012\u0011\u0007\u0002\f\u0003\u0012l\u0017N\\\"mS\u0016tG/A\u0003dY>\u001cX\r\u0006\u0002\u0002DA\u0019!&!\u0012\n\u0007\u0005\u001d3F\u0001\u0003V]&$\u0018a\u00033fY\u0016$X\rV8qS\u000e$B!a\u0011\u0002N!9\u0011q\n\bA\u0002\u0005\r\u0011!\u0003;pa&\u001cg*Y7f\u0003]\u0019'/Z1uKR{\u0007/[2JM:{G\u000f\u0015:fg\u0016tG\u000f\u0006\u0004\u0002D\u0005U\u0013q\f\u0005\b\u0003/z\u0001\u0019AA-\u0003\u0015!x\u000e]5d!\u0011\ty#a\u0017\n\t\u0005u\u0013\u0011\u0007\u0002\t\u001d\u0016<Hk\u001c9jG\"1\u0011\u0011M\bA\u0002e\fAaY8oM\u0006yAo\u001c9jG\u0016sGm\u00144gg\u0016$8\u000f\u0006\u0004\u0002h\u0005-\u0015Q\u0012\t\u0007\u0003S\n\u0019(!\u001f\u000f\t\u0005-\u0014q\u000e\b\u0004'\u00065\u0014\"\u0001\u0017\n\u0007\u0005E4&A\u0004qC\u000e\\\u0017mZ3\n\t\u0005U\u0014q\u000f\u0002\u0005\u0019&\u001cHOC\u0002\u0002r-\u0002rAKA>\u0003\u007f\n))C\u0002\u0002~-\u0012a\u0001V;qY\u0016\u0014\u0004c\u0001\u0016\u0002\u0002&\u0019\u00111Q\u0016\u0003\u0007%sG\u000fE\u0002+\u0003\u000fK1!!#,\u0005\u0011auN\\4\t\u000f\u0005=\u0003\u00031\u0001\u0002\u0004!1\u0011q\u0012\tA\u0002e\fQ\"Y2dKN\u001cx\n\u001d;j_:\u001c\u0018!E3yiJ\f7\r\u001e)beRLG/[8ogR1\u0011QSAS\u0003O\u0003b!a&\u0002t\u0005eeb\u0001\u0016\u0002pA!\u00111TAQ\u001b\t\tiJ\u0003\u0003\u0002 \u0006e\u0012AB2p[6|g.\u0003\u0003\u0002$\u0006u%A\u0004+pa&\u001c\u0007+\u0019:uSRLwN\u001c\u0005\b\u0003\u001f\n\u0002\u0019AA\u0002\u0011\u001d\tI+\u0005a\u0001\u0003W\u000b\u0001bY8ogVlWM\u001d\t\t\u0003[\u000b\t,a\u0001\u0002\u00045\u0011\u0011q\u0016\u0006\u0005\u0003S\u000b)$\u0003\u0003\u00024\u0006=&!D&bM.\f7i\u001c8tk6,'/A\u0006oK^\u001cuN\\:v[\u0016\u0014HCBAV\u0003s\u000bY\fC\u0004\u0002PI\u0001\r!a\u0001\t\r\u0005=%\u00031\u0001z\u0003)\u0011W/\u001b7e!J|\u0007o\u001d\u000b\u0005\u00037\t\t\r\u0003\u0004\u0002\u0010N\u0001\r!_\u0001\u0011i>\u0004\u0018nY*bm\u0016|eMZ:fiN$\u0002\"a\u0011\u0002H\u0006-\u0017Q\u001a\u0005\b\u0003\u0013$\u0002\u0019AA\u0002\u0003=!x\u000e]5d\u0007>tg-[4OC6,\u0007BBAH)\u0001\u0007\u0011\u0010C\u0004\u0002PR\u0001\r!a\u001a\u0002\u000f=4gm]3ug\u0006!\u0012\rZ7j]R{\u0007/[2QCJ$\u0018\u000e^5p]N$B!!6\u0002^B1\u0011\u0011NA:\u0003/\u0004B!a'\u0002Z&!\u00111\\AO\u0005I!v\u000e]5d!\u0006\u0014H/\u001b;j_:LeNZ8\t\u000f\u0005=S\u00031\u0001\u0002\u0004\u0005iBo\u001c9jG\u000e+(O]3oi>3gm]3ug\u001a\u0013x.\\*ue\u0016\fW\u000e\u0006\u0003\u0002d\u0006%\b#\u0002\u0016\u0002f\u0006\u001d\u0014bAAtW\t1q\n\u001d;j_:Dq!!3\u0017\u0001\u0004\t\u0019!\u0001\td_6,Go\u00144gg\u0016$8\u000fT8dWR!\u0011q^A|!\u0011\t\t0a=\u000e\u0003\u0005J1!!>\"\u0005!1\u0015\u000e\\3M_\u000e\\\u0007bBAe/\u0001\u0007\u00111A\u0001\u001ci>\u0004\u0018nY\"veJ,g\u000e^(gMN,Go\u001d$s_64\u0015\u000e\\3\u0015\t\u0005\r\u0018Q \u0005\b\u0003\u0013D\u0002\u0019AA\u0002\u0003M!x\u000e]5d\u0007V\u0014(/\u001a8u\u001f\u001a47/\u001a;t)\u0011\t\u0019Oa\u0001\t\u000f\u0005%\u0017\u00041\u0001\u0002\u0004\u0005iqN\u001a4tKR\u001c\u0018i\u001d&t_:$bA!\u0003\u0003\f\t5\u0001#\u0002\u0016\u0002f\u0006\r\u0001bBA(5\u0001\u0007\u00111\u0001\u0005\b\u0003\u001fT\u0002\u0019AA4\u0003E\u0019wN\\:v[\u0016$v\u000e]5d\u0005\u0006$8\r\u001b\u000b\t\u0005'\u0011\tDa\r\u0003>A9!&a\u001f\u0003\u0016\u0005\u001d\u0004\u0003\u0002B\f\u0005WqAA!\u0007\u0003*9!!1\u0004B\u0014\u001d\u0011\u0011iB!\n\u000f\t\t}!1\u0005\b\u0004'\n\u0005\u0012\"\u0001\"\n\u0005\u0001\u000b\u0015B\u0001 @\u0013\taT(C\u0002\u0002rmJAA!\f\u00030\tIA)\u0019;b\rJ\fW.\u001a\u0006\u0004\u0003cZ\u0004bBAe7\u0001\u0007\u00111\u0001\u0005\b\u0005kY\u0002\u0019\u0001B\u001c\u0003\u001d\u0019Xm]:j_:\u00042A\u000fB\u001d\u0013\r\u0011Yd\u000f\u0002\r'B\f'o[*fgNLwN\u001c\u0005\u00073n\u0001\r!a\u0004\u0002+\r|gn];nKR{\u0007/[2TiJ,\u0017-\\5oOR1!Q\u0003B\"\u0005\u000bBqA!\u000e\u001d\u0001\u0004\u00119\u0004\u0003\u0004Z9\u0001\u0007\u0011qB\u0001\fg&t7\u000eV8U_BL7\r\u0006\u0004\u0002D\t-#Q\n\u0005\u00073v\u0001\r!a\u0004\t\u000f\t=S\u00041\u0001\u0003\u0016\u0005\u0011AM\u001a")
public class KafkaClient
implements StrictLogging,
DatasetLogging,
AutoCloseable {
    private AdminClient client;
    private final Settings settings;
    private final Mode cometOffsetsMode;
    private final scala.collection.immutable.Map<String, String> serverOptions;
    private final Settings.KafkaTopicConfig cometOffsetsConfig;
    private final Properties props;
    private final Logger logger;
    private volatile boolean bitmap$0;

    @Override
    public <T> DatasetLogging.DatasetHelper<T> DatasetHelper(Dataset<T> ds) {
        return DatasetLogging.DatasetHelper$(this, ds);
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    public Mode cometOffsetsMode() {
        return this.cometOffsetsMode;
    }

    public scala.collection.immutable.Map<String, String> serverOptions() {
        return this.serverOptions;
    }

    public Settings.KafkaTopicConfig cometOffsetsConfig() {
        return this.cometOffsetsConfig;
    }

    public Properties props() {
        return this.props;
    }

    private AdminClient client$lzycompute() {
        KafkaClient kafkaClient = this;
        synchronized (kafkaClient) {
            if (!this.bitmap$0) {
                this.client = AdminClient.create((Properties)this.props());
                this.bitmap$0 = true;
            }
        }
        return this.client;
    }

    public AdminClient client() {
        return !this.bitmap$0 ? this.client$lzycompute() : this.client;
    }

    @Override
    public void close() {
        this.client().close();
    }

    public void deleteTopic(String topicName) {
        block4: {
            BoxedUnit boxedUnit;
            BoxedUnit boxedUnit2;
            boolean found = ((Set)this.client().listTopics().names().get()).contains(topicName);
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info(((TraversableOnce)JavaConverters$.MODULE$.asScalaSetConverter((Set)this.client().listTopics().names().get()).asScala()).toSet().mkString("\n"));
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                boxedUnit2 = BoxedUnit.UNIT;
            }
            if (!found) break block4;
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Deleting topic {}", new Object[]{topicName});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            this.client().deleteTopics(JavaConverters$.MODULE$.asJavaCollectionConverter((Iterable)new .colon.colon((Object)topicName, (List)Nil$.MODULE$)).asJavaCollection());
        }
    }

    public void createTopicIfNotPresent(NewTopic topic, scala.collection.immutable.Map<String, String> conf) {
        block0: {
            boolean found = ((Set)this.client().listTopics().names().get()).contains(topic.name());
            if (found) break block0;
            this.client().createTopics(Collections.singleton(topic.configs((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(conf).asJava()))).all().get();
        }
    }

    public List<Tuple2<Object, Object>> topicEndOffsets(String topicName, scala.collection.immutable.Map<String, String> accessOptions) {
        List value;
        Try try_ = Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> {
            KafkaConsumer<String, String> consumer = this.newConsumer(topicName, accessOptions);
            List<TopicPartition> partitions = this.extractPartitions(topicName, consumer);
            consumer.assign((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter(partitions).asJava());
            consumer.seekToEnd((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter(partitions).asJava());
            List result = (List)partitions.map((Function1 & Serializable & scala.Serializable)p -> new Tuple2.mcIJ.sp(p.partition(), consumer.position(p)), List$.MODULE$.canBuildFrom());
            consumer.close();
            return result;
        });
        if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable e = failure.exception();
            e.printStackTrace();
            throw e;
        }
        if (!(try_ instanceof Success)) {
            throw new MatchError((Object)try_);
        }
        Success success = (Success)try_;
        List list = value = (List)success.value();
        return list;
    }

    /*
     * WARNING - void declaration
     */
    private List<TopicPartition> extractPartitions(String topicName, KafkaConsumer<String, String> consumer) {
        void var3_3;
        List partitions = ((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(consumer.partitionsFor(topicName)).asScala()).map((Function1 & Serializable & scala.Serializable)info -> new TopicPartition(topicName, info.partition()), Buffer$.MODULE$.canBuildFrom())).toList();
        return var3_3;
    }

    private KafkaConsumer<String, String> newConsumer(String topicName, scala.collection.immutable.Map<String, String> accessOptions) {
        BoxedUnit boxedUnit;
        Properties props = this.buildProps(accessOptions);
        if (this.logger().underlying().isInfoEnabled()) {
            BoxedUnit boxedUnit2;
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("access options for topic {} ==>", new Object[]{topicName});
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                boxedUnit2 = BoxedUnit.UNIT;
            }
            ((IterableLike)JavaConverters$.MODULE$.propertiesAsScalaMapConverter(props).asScala()).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                KafkaClient.$anonfun$newConsumer$1(this, x0$1);
                return BoxedUnit.UNIT;
            });
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        KafkaConsumer consumer = new KafkaConsumer(props);
        return consumer;
    }

    /*
     * WARNING - void declaration
     */
    private Properties buildProps(scala.collection.immutable.Map<String, String> accessOptions) {
        void var2_2;
        Properties props = new Properties();
        accessOptions.foreach((Function1 & Serializable & scala.Serializable)option -> props.put(option._1(), option._2()));
        return var2_2;
    }

    public void topicSaveOffsets(String topicConfigName, scala.collection.immutable.Map<String, String> accessOptions, List<Tuple2<Object, Object>> offsets) {
        Mode mode = this.cometOffsetsMode();
        if (((Object)Mode$STREAM$.MODULE$).equals(mode)) {
            Properties props = this.buildProps(accessOptions);
            KafkaProducer producer = new KafkaProducer(props);
            offsets.foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                Tuple2 tuple2 = x0$1;
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                int partition = tuple2._1$mcI$sp();
                long offset = tuple2._2$mcJ$sp();
                Future future = producer.send(new ProducerRecord(this.cometOffsetsConfig().topicName(), (Object)new StringBuilder(1).append(topicConfigName).append("/").append(partition).toString(), (Object)String.valueOf(BoxesRunTime.boxToLong((long)offset))));
                return future;
            });
            producer.close();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (((Object)Mode$FILE$.MODULE$).equals(mode)) {
            FileLock qual$1 = this.cometOffsetsLock(topicConfigName);
            long x$1 = qual$1.doExclusively$default$1();
            JFunction0.mcV.sp & Serializable & scala.Serializable x$2 = (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                BoxedUnit boxedUnit;
                Path cometOffsetsPath = new Path(this.cometOffsetsConfig().topicName(), topicConfigName);
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Saving comet offsets to path {}", new Object[]{cometOffsetsPath});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                $this.settings.storageHandler().write(YamlSerializer$.MODULE$.serializeObject(offsets.map((Function1 & Serializable & scala.Serializable)x0$2 -> {
                    Tuple2 tuple2 = x0$2;
                    if (tuple2 == null) {
                        throw new MatchError((Object)tuple2);
                    }
                    int partition = tuple2._1$mcI$sp();
                    long offset = tuple2._2$mcJ$sp();
                    String string = new StringBuilder(1).append(Integer.toString(partition)).append(",").append(Long.toString(offset)).toString();
                    return string;
                }, List$.MODULE$.canBuildFrom())), cometOffsetsPath);
            };
            BoxedUnit boxedUnit = (BoxedUnit)qual$1.doExclusively(x$1, x$2);
        } else {
            throw new Exception("Should never happen");
        }
    }

    public List<TopicPartitionInfo> adminTopicPartitions(String topicName) {
        return ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(((TopicDescription)((java.util.Map)this.client().describeTopics(Collections.singleton(topicName)).all().get()).get(topicName)).partitions()).asScala()).toList();
    }

    private Option<List<Tuple2<Object, Object>>> topicCurrentOffsetsFromStream(String topicConfigName) {
        Properties props = new Properties();
        this.cometOffsetsConfig().accessOptions().foreach((Function1 & Serializable & scala.Serializable)option -> props.put(option._1(), option._2()));
        KafkaConsumer consumer = new KafkaConsumer(props);
        List<TopicPartition> partitions = this.extractPartitions(this.cometOffsetsConfig().topicName(), (KafkaConsumer<String, String>)consumer);
        consumer.assign((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter(partitions).asJava());
        consumer.seekToBeginning((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter(partitions).asJava());
        scala.collection.mutable.Map offsets = Map$.MODULE$.empty();
        ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
        while (records != null && !records.isEmpty()) {
            ((IterableLike)JavaConverters$.MODULE$.iterableAsScalaIterableConverter(records.records(this.cometOffsetsConfig().topicName())).asScala()).foreach((Function1 & Serializable & scala.Serializable)r -> (scala.collection.mutable.Map)offsets.$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(r.key()), r.value())));
            records = consumer.poll(Duration.ofMillis(100L));
        }
        Option res = ((TraversableLike)offsets.keys().map((Function1 & Serializable & scala.Serializable)k -> {
            String[] tab = new StringOps(Predef$.MODULE$.augmentString(k)).split('/');
            return new Tuple3((Object)tab[0], (Object)tab[1], offsets.apply(k));
        }, Iterable$.MODULE$.canBuildFrom())).groupBy((Function1 & Serializable & scala.Serializable)x0$1 -> {
            String topic;
            Tuple3 tuple3 = x0$1;
            if (tuple3 == null) {
                throw new MatchError((Object)tuple3);
            }
            String string = topic = (String)tuple3._1();
            return string;
        }).mapValues((Function1 & Serializable & scala.Serializable)x$1 -> ((TraversableOnce)x$1.map((Function1 & Serializable & scala.Serializable)x0$2 -> {
            Tuple3 tuple3 = x0$2;
            if (tuple3 == null) {
                throw new MatchError((Object)tuple3);
            }
            String partition = (String)tuple3._2();
            String offset = (String)tuple3._3();
            Tuple2.mcIJ.sp sp2 = new Tuple2.mcIJ.sp(new StringOps(Predef$.MODULE$.augmentString(partition)).toInt(), new StringOps(Predef$.MODULE$.augmentString(offset)).toLong());
            return sp2;
        }, Iterable$.MODULE$.canBuildFrom())).toList()).get((Object)topicConfigName);
        return res;
    }

    private FileLock cometOffsetsLock(String topicConfigName) {
        Path lockPath = new Path(this.settings.comet().lock().path(), new StringBuilder(19).append("comet_offsets_").append(topicConfigName).append(".lock").toString());
        return new FileLock(lockPath, this.settings.storageHandler());
    }

    private Option<List<Tuple2<Object, Object>>> topicCurrentOffsetsFromFile(String topicConfigName) {
        FileLock qual$1 = this.cometOffsetsLock(topicConfigName);
        long x$1 = qual$1.doExclusively$default$1();
        Function0 & Serializable & scala.Serializable x$2 = (Function0 & Serializable & scala.Serializable)() -> {
            None$ none$;
            Path cometOffsetsPath = new Path(this.cometOffsetsConfig().topicName(), topicConfigName);
            if ($this.settings.storageHandler().exists(cometOffsetsPath)) {
                BoxedUnit boxedUnit;
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Loading comet offsets to path {}", new Object[]{cometOffsetsPath});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                List res = (List)((List)YamlSerializer$.MODULE$.mapper().readValue($this.settings.storageHandler().read(cometOffsetsPath), List.class)).map((Function1 & Serializable & scala.Serializable)str -> {
                    String[] tab = new StringOps(Predef$.MODULE$.augmentString(str)).split(',');
                    return new Tuple2.mcIJ.sp(new StringOps(Predef$.MODULE$.augmentString(tab[0])).toInt(), new StringOps(Predef$.MODULE$.augmentString(tab[1])).toLong());
                }, List$.MODULE$.canBuildFrom());
                none$ = new Some((Object)res);
            } else {
                BoxedUnit boxedUnit;
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Cannot load comet offsets: {} file does not exist", new Object[]{cometOffsetsPath});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                none$ = None$.MODULE$;
            }
            return none$;
        };
        return (Option)qual$1.doExclusively(x$1, x$2);
    }

    public Option<List<Tuple2<Object, Object>>> topicCurrentOffsets(String topicConfigName) {
        Option<List<Tuple2<Object, Object>>> option;
        Mode mode = this.cometOffsetsMode();
        if (((Object)Mode$STREAM$.MODULE$).equals(mode)) {
            option = this.topicCurrentOffsetsFromStream(topicConfigName);
        } else if (((Object)Mode$FILE$.MODULE$).equals(mode)) {
            option = this.topicCurrentOffsetsFromFile(topicConfigName);
        } else {
            throw new Exception("Should never happen");
        }
        return option;
    }

    public Option<String> offsetsAsJson(String topicName, List<Tuple2<Object, Object>> offsets) {
        None$ none$;
        if (offsets.isEmpty()) {
            none$ = None$.MODULE$;
        } else {
            String offsetsAsString = ((TraversableOnce)offsets.map((Function1 & Serializable & scala.Serializable)x0$1 -> {
                Tuple2 tuple2 = x0$1;
                if (tuple2 == null) {
                    throw new MatchError((Object)tuple2);
                }
                int partition = tuple2._1$mcI$sp();
                long partitionOffset = tuple2._2$mcJ$sp();
                String string = new StringBuilder(4).append("\"").append(partition).append("\": ").append(partitionOffset).toString();
                return string;
            }, List$.MODULE$.canBuildFrom())).mkString(",");
            none$ = new Some((Object)new StringBuilder(7).append("{\"").append(topicName).append("\":{").append(offsetsAsString).append("}}").toString());
        }
        return none$;
    }

    public Tuple2<Dataset<Row>, List<Tuple2<Object, Object>>> consumeTopicBatch(String topicConfigName, SparkSession session, Settings.KafkaTopicConfig config) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        BoxedUnit boxedUnit3;
        long EARLIEST_OFFSET = -2L;
        List startOffsets = (List)this.topicCurrentOffsets(topicConfigName).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            KafkaConsumer<String, String> consumer = this.newConsumer(config.topicName(), config.accessOptions());
            List partitions = (List)this.extractPartitions(config.topicName(), consumer).map((Function1 & Serializable & scala.Serializable)p -> new Tuple2.mcIJ.sp(p.partition(), EARLIEST_OFFSET), List$.MODULE$.canBuildFrom());
            consumer.close();
            return partitions;
        });
        if (this.logger().underlying().isInfoEnabled()) {
            startOffsets.foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                KafkaClient.$anonfun$consumeTopicBatch$3(this, topicConfigName, x0$1);
                return BoxedUnit.UNIT;
            });
            boxedUnit3 = BoxedUnit.UNIT;
        } else {
            boxedUnit3 = BoxedUnit.UNIT;
        }
        List<Tuple2<Object, Object>> endOffsets = this.topicEndOffsets(config.topicName(), config.accessOptions());
        if (this.logger().underlying().isInfoEnabled()) {
            endOffsets.foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
                KafkaClient.$anonfun$consumeTopicBatch$4(this, topicConfigName, x0$2);
                return BoxedUnit.UNIT;
            });
            boxedUnit2 = BoxedUnit.UNIT;
        } else {
            boxedUnit2 = BoxedUnit.UNIT;
        }
        scala.collection.immutable.Map withOffsetsTopicOptions = config.accessOptions().$plus$plus((GenTraversableOnce)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"startingOffsets"), this.offsetsAsJson(config.topicName(), (List<Tuple2<Object, Object>>)startOffsets).getOrElse((Function0 & Serializable & scala.Serializable)() -> "earliest")), (List)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"endingOffsets"), this.offsetsAsJson(config.topicName(), endOffsets).getOrElse((Function0 & Serializable & scala.Serializable)() -> "latest")), (List)Nil$.MODULE$)));
        DataFrameReader reader = session.read().format("kafka");
        Dataset df = reader.options((Map)withOffsetsTopicOptions).load().selectExpr(config.fields());
        if (this.logger().underlying().isInfoEnabled()) {
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info(this.DatasetHelper(df).schemaString());
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        return new Tuple2((Object)df, endOffsets);
    }

    public Dataset<Row> consumeTopicStreaming(SparkSession session, Settings.KafkaTopicConfig config) {
        DataStreamReader reader = session.readStream().format("kafka");
        Dataset df = reader.options(config.accessOptions()).load().selectExpr(config.fields());
        return df;
    }

    public void sinkToTopic(Settings.KafkaTopicConfig config, Dataset<Row> df) {
        DataFrameWriter writer = df.selectExpr(config.fields()).write().format("kafka");
        writer.options(config.accessOptions()).option("topic", config.topicName()).save();
    }

    public static final /* synthetic */ void $anonfun$newConsumer$1(KafkaClient $this, Tuple2 x0$1) {
        BoxedUnit boxedUnit;
        Tuple2 tuple2 = x0$1;
        if (tuple2 != null) {
            String k = (String)tuple2._1();
            String v = (String)tuple2._2();
            if ($this.logger().underlying().isInfoEnabled()) {
                $this.logger().underlying().info("\t{}={}", new Object[]{k, v});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        } else {
            throw new MatchError((Object)tuple2);
        }
        BoxedUnit boxedUnit2 = boxedUnit;
    }

    public static final /* synthetic */ void $anonfun$consumeTopicBatch$3(KafkaClient $this, String topicConfigName$3, Tuple2 x0$1) {
        BoxedUnit boxedUnit;
        Tuple2 tuple2 = x0$1;
        if (tuple2 != null) {
            int partition = tuple2._1$mcI$sp();
            long offsetStart = tuple2._2$mcJ$sp();
            if ($this.logger().underlying().isInfoEnabled()) {
                $this.logger().underlying().info("{} start-offset -> {}:{}", new Object[]{topicConfigName$3, BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)offsetStart)});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        } else {
            throw new MatchError((Object)tuple2);
        }
        BoxedUnit boxedUnit2 = boxedUnit;
    }

    public static final /* synthetic */ void $anonfun$consumeTopicBatch$4(KafkaClient $this, String topicConfigName$3, Tuple2 x0$2) {
        BoxedUnit boxedUnit;
        Tuple2 tuple2 = x0$2;
        if (tuple2 != null) {
            int partition = tuple2._1$mcI$sp();
            long offsetEnd = tuple2._2$mcJ$sp();
            if ($this.logger().underlying().isInfoEnabled()) {
                $this.logger().underlying().info("{} end-offset -> {}:{}", new Object[]{topicConfigName$3, BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)offsetEnd)});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        } else {
            throw new MatchError((Object)tuple2);
        }
        BoxedUnit boxedUnit2 = boxedUnit;
    }

    public KafkaClient(Settings.KafkaConfig kafkaConfig, Settings settings) {
        BoxedUnit boxedUnit;
        this.settings = settings;
        StrictLogging.$init$((StrictLogging)this);
        DatasetLogging.$init$(this);
        this.cometOffsetsMode = (Mode)settings.comet().kafka().cometOffsetsMode().map((Function1 & Serializable & scala.Serializable)value -> Mode$.MODULE$.fromString((String)value)).getOrElse((Function0 & Serializable & scala.Serializable)() -> Mode$STREAM$.MODULE$);
        this.serverOptions = kafkaConfig.serverOptions();
        this.cometOffsetsConfig = (Settings.KafkaTopicConfig)kafkaConfig.topics().apply((Object)"comet_offsets");
        this.props = new Properties();
        this.serverOptions().foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            String k = (String)tuple2._1();
            String v = (String)tuple2._2();
            Object object = this.props().put(k, v);
            return object;
        });
        Mode mode = this.cometOffsetsMode();
        if (((Object)Mode$STREAM$.MODULE$).equals(mode)) {
            this.createTopicIfNotPresent(new NewTopic(this.cometOffsetsConfig().topicName(), this.cometOffsetsConfig().partitions(), this.cometOffsetsConfig().replicationFactor()), (scala.collection.immutable.Map<String, String>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"cleanup.policy"), (Object)"compact")}))));
            boxedUnit = BoxedUnit.UNIT;
        } else if (((Object)Mode$FILE$.MODULE$).equals(mode)) {
            boxedUnit = !settings.storageHandler().exists(new Path(this.cometOffsetsConfig().topicName())) ? BoxesRunTime.boxToBoolean((boolean)settings.storageHandler().mkdirs(new Path(this.cometOffsetsConfig().topicName()))) : BoxedUnit.UNIT;
        } else {
            throw new Exception("Should never happen");
        }
    }
}

