/*
 * Decompiled with CFR 0.152.
 */
package ai.starlake.utils.kafka;

import ai.starlake.config.Settings;
import ai.starlake.schema.model.Mode;
import ai.starlake.schema.model.Mode$;
import ai.starlake.schema.model.Mode$FILE$;
import ai.starlake.schema.model.Mode$STREAM$;
import ai.starlake.utils.FileLock;
import ai.starlake.utils.YamlSerializer$;
import ai.starlake.utils.kafka.KafkaClient$;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.DatasetLogging;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Map$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;

@ScalaSignature(bytes="\u0006\u0001\t\rd\u0001B\u0011#\u0001-B\u0001\u0002\u0015\u0001\u0003\u0002\u0003\u0006I!\u0015\u0005\tG\u0002\u0011\t\u0011)A\u0006I\")\u0001\u000e\u0001C\u0001S\"9q\u000e\u0001b\u0001\n\u0003\u0001\bBB=\u0001A\u0003%\u0011\u000fC\u0004{\u0001\t\u0007I\u0011A>\t\u000f\u0005=\u0001\u0001)A\u0005y\"I\u0011\u0011\u0003\u0001C\u0002\u0013\u0005\u00111\u0003\u0005\t\u00037\u0001\u0001\u0015!\u0003\u0002\u0016!I\u0011Q\u0004\u0001C\u0002\u0013\u0005\u0011q\u0004\u0005\t\u0003[\u0001\u0001\u0015!\u0003\u0002\"!Q\u0011q\u0006\u0001\t\u0006\u0004%\t!!\r\t\u000f\u0005\u0015\u0003\u0001\"\u0001\u0002H!9\u0011q\n\u0001\u0005\u0002\u0005E\u0003bBA,\u0001\u0011\u0005\u0011\u0011\f\u0005\b\u0003S\u0002A\u0011AA6\u0011\u001d\t9\n\u0001C\u0005\u00033Cq!a/\u0001\t\u0013\ti\fC\u0004\u0002D\u0002!I!!2\t\u000f\u0005%\u0007\u0001\"\u0001\u0002L\"9\u0011q\u001b\u0001\u0005\u0002\u0005e\u0007bBAs\u0001\u0011%\u0011q\u001d\u0005\b\u0003c\u0004A\u0011BAz\u0011\u001d\ty\u0010\u0001C\u0005\u0005\u0003AqA!\u0002\u0001\t\u0003\u00119\u0001C\u0004\u0003\f\u0001!\tA!\u0004\t\u000f\tU\u0001\u0001\"\u0001\u0003\u0018!9!Q\t\u0001\u0005\u0002\t\u001dsa\u0002B(E!\u0005!\u0011\u000b\u0004\u0007C\tB\tAa\u0015\t\r!tB\u0011\u0001B+\u0011\u001d\u00119F\bC\u0001\u00053\u00121bS1gW\u0006\u001cE.[3oi*\u00111\u0005J\u0001\u0006W\u000647.\u0019\u0006\u0003K\u0019\nQ!\u001e;jYNT!a\n\u0015\u0002\u0011M$\u0018M\u001d7bW\u0016T\u0011!K\u0001\u0003C&\u001c\u0001aE\u0003\u0001YIb\u0004\n\u0005\u0002.a5\taFC\u00010\u0003\u0015\u00198-\u00197b\u0013\t\tdF\u0001\u0004B]f\u0014VM\u001a\t\u0003gij\u0011\u0001\u000e\u0006\u0003kY\nAb]2bY\u0006dwnZ4j]\u001eT!a\u000e\u001d\u0002\u0011QL\b/Z:bM\u0016T\u0011!O\u0001\u0004G>l\u0017BA\u001e5\u00055\u0019FO]5di2{wmZ5oOB\u0011QHR\u0007\u0002})\u0011q\bQ\u0001\u0004gFd'BA!C\u0003\u0015\u0019\b/\u0019:l\u0015\t\u0019E)\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u000b\u0006\u0019qN]4\n\u0005\u001ds$A\u0004#bi\u0006\u001cX\r\u001e'pO\u001eLgn\u001a\t\u0003\u0013:k\u0011A\u0013\u0006\u0003\u00172\u000bA\u0001\\1oO*\tQ*\u0001\u0003kCZ\f\u0017BA(K\u00055\tU\u000f^8DY>\u001cX-\u00192mK\u0006Y1.\u00194lC\u000e{gNZ5h!\t\u0011\u0006M\u0004\u0002T;:\u0011Ak\u0017\b\u0003+js!AV-\u000e\u0003]S!\u0001\u0017\u0016\u0002\rq\u0012xn\u001c;?\u0013\u0005I\u0013BA\u0014)\u0013\taf%\u0001\u0004d_:4\u0017nZ\u0005\u0003=~\u000b\u0001bU3ui&twm\u001d\u0006\u00039\u001aJ!!\u00192\u0003\u0017-\u000bgm[1D_:4\u0017n\u001a\u0006\u0003=~\u000b\u0001b]3ui&twm\u001d\t\u0003K\u001al\u0011aX\u0005\u0003O~\u0013\u0001bU3ui&twm]\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0005)tGCA6n!\ta\u0007!D\u0001#\u0011\u0015\u00197\u0001q\u0001e\u0011\u0015\u00016\u00011\u0001R\u0003A\u0019w.\\3u\u001f\u001a47/\u001a;t\u001b>$W-F\u0001r!\t\u0011x/D\u0001t\u0015\t!X/A\u0003n_\u0012,GN\u0003\u0002wM\u000511o\u00195f[\u0006L!\u0001_:\u0003\t5{G-Z\u0001\u0012G>lW\r^(gMN,Go]'pI\u0016\u0004\u0013!D:feZ,'o\u00149uS>t7/F\u0001}!\u001di\u00181AA\u0005\u0003\u0013q!A`@\u0011\u0005Ys\u0013bAA\u0001]\u00051\u0001K]3eK\u001aLA!!\u0002\u0002\b\t\u0019Q*\u00199\u000b\u0007\u0005\u0005a\u0006E\u0002~\u0003\u0017IA!!\u0004\u0002\b\t11\u000b\u001e:j]\u001e\fab]3sm\u0016\u0014x\n\u001d;j_:\u001c\b%\u0001\nd_6,Go\u00144gg\u0016$8oQ8oM&<WCAA\u000b!\r\u0011\u0016qC\u0005\u0004\u00033\u0011'\u0001E&bM.\fGk\u001c9jG\u000e{gNZ5h\u0003M\u0019w.\\3u\u001f\u001a47/\u001a;t\u0007>tg-[4!\u0003]\u0019XM\u001d<fe>\u0003H/[8ogB\u0013x\u000e]3si&,7/\u0006\u0002\u0002\"A!\u00111EA\u0015\u001b\t\t)CC\u0002\u0002(1\u000bA!\u001e;jY&!\u00111FA\u0013\u0005)\u0001&o\u001c9feRLWm]\u0001\u0019g\u0016\u0014h/\u001a:PaRLwN\\:Qe>\u0004XM\u001d;jKN\u0004\u0013AB2mS\u0016tG/\u0006\u0002\u00024A!\u0011QGA!\u001b\t\t9D\u0003\u0003\u0002:\u0005m\u0012!B1e[&t'\u0002BA\u001f\u0003\u007f\tqa\u00197jK:$8O\u0003\u0002$\u0005&!\u00111IA\u001c\u0005-\tE-\\5o\u00072LWM\u001c;\u0002\u000b\rdwn]3\u0015\u0005\u0005%\u0003cA\u0017\u0002L%\u0019\u0011Q\n\u0018\u0003\tUs\u0017\u000e^\u0001\fI\u0016dW\r^3U_BL7\r\u0006\u0003\u0002J\u0005M\u0003bBA+\u001d\u0001\u0007\u0011\u0011B\u0001\ni>\u0004\u0018n\u0019(b[\u0016\fqc\u0019:fCR,Gk\u001c9jG&3gj\u001c;Qe\u0016\u001cXM\u001c;\u0015\r\u0005%\u00131LA3\u0011\u001d\tif\u0004a\u0001\u0003?\nQ\u0001^8qS\u000e\u0004B!!\u000e\u0002b%!\u00111MA\u001c\u0005!qUm\u001e+pa&\u001c\u0007BBA4\u001f\u0001\u0007A0\u0001\u0003d_:4\u0017a\u0004;pa&\u001cWI\u001c3PM\u001a\u001cX\r^:\u0015\r\u00055\u0014\u0011SAJ!\u0019\ty'!\u001f\u0002\u00009!\u0011\u0011OA;\u001d\r1\u00161O\u0005\u0002_%\u0019\u0011q\u000f\u0018\u0002\u000fA\f7m[1hK&!\u00111PA?\u0005\u0011a\u0015n\u001d;\u000b\u0007\u0005]d\u0006E\u0004.\u0003\u0003\u000b))a#\n\u0007\u0005\reF\u0001\u0004UkBdWM\r\t\u0004[\u0005\u001d\u0015bAAE]\t\u0019\u0011J\u001c;\u0011\u00075\ni)C\u0002\u0002\u0010:\u0012A\u0001T8oO\"9\u0011Q\u000b\tA\u0002\u0005%\u0001BBAK!\u0001\u0007A0A\u0007bG\u000e,7o](qi&|gn]\u0001\u0012Kb$(/Y2u!\u0006\u0014H/\u001b;j_:\u001cHCBAN\u0003W\u000bi\u000b\u0005\u0004\u0002\u001e\u0006e\u0014q\u0014\b\u0004[\u0005U\u0004\u0003BAQ\u0003Ok!!a)\u000b\t\u0005\u0015\u0016qH\u0001\u0007G>lWn\u001c8\n\t\u0005%\u00161\u0015\u0002\u000f)>\u0004\u0018n\u0019)beRLG/[8o\u0011\u001d\t)&\u0005a\u0001\u0003\u0013Aq!a,\u0012\u0001\u0004\t\t,\u0001\u0005d_:\u001cX/\\3s!!\t\u0019,a.\u0002\n\u0005%QBAA[\u0015\u0011\ty+a\u000f\n\t\u0005e\u0016Q\u0017\u0002\u000e\u0017\u000647.Y\"p]N,X.\u001a:\u0002\u00179,woQ8ogVlWM\u001d\u000b\u0007\u0003c\u000by,!1\t\u000f\u0005U#\u00031\u0001\u0002\n!1\u0011Q\u0013\nA\u0002q\f!BY;jY\u0012\u0004&o\u001c9t)\u0011\t\t#a2\t\r\u0005U5\u00031\u0001}\u0003A!x\u000e]5d'\u00064Xm\u00144gg\u0016$8\u000f\u0006\u0005\u0002J\u00055\u0017\u0011[Aj\u0011\u001d\ty\r\u0006a\u0001\u0003\u0013\tq\u0002^8qS\u000e\u001cuN\u001c4jO:\u000bW.\u001a\u0005\u0007\u0003+#\u0002\u0019\u0001?\t\u000f\u0005UG\u00031\u0001\u0002n\u00059qN\u001a4tKR\u001c\u0018\u0001F1e[&tGk\u001c9jGB\u000b'\u000f^5uS>t7\u000f\u0006\u0003\u0002\\\u0006\r\bCBA8\u0003s\ni\u000e\u0005\u0003\u0002\"\u0006}\u0017\u0002BAq\u0003G\u0013!\u0003V8qS\u000e\u0004\u0016M\u001d;ji&|g.\u00138g_\"9\u0011QK\u000bA\u0002\u0005%\u0011!\b;pa&\u001c7)\u001e:sK:$xJ\u001a4tKR\u001chI]8n'R\u0014X-Y7\u0015\t\u0005%\u0018q\u001e\t\u0006[\u0005-\u0018QN\u0005\u0004\u0003[t#AB(qi&|g\u000eC\u0004\u0002PZ\u0001\r!!\u0003\u0002!\r|W.\u001a;PM\u001a\u001cX\r^:M_\u000e\\G\u0003BA{\u0003{\u0004B!a>\u0002z6\tA%C\u0002\u0002|\u0012\u0012\u0001BR5mK2{7m\u001b\u0005\b\u0003\u001f<\u0002\u0019AA\u0005\u0003m!x\u000e]5d\u0007V\u0014(/\u001a8u\u001f\u001a47/\u001a;t\rJ|WNR5mKR!\u0011\u0011\u001eB\u0002\u0011\u001d\ty\r\u0007a\u0001\u0003\u0013\t1\u0003^8qS\u000e\u001cUO\u001d:f]R|eMZ:fiN$B!!;\u0003\n!9\u0011qZ\rA\u0002\u0005%\u0011!D8gMN,Go]!t\u0015N|g\u000e\u0006\u0004\u0003\u0010\tE!1\u0003\t\u0006[\u0005-\u0018\u0011\u0002\u0005\b\u0003+R\u0002\u0019AA\u0005\u0011\u001d\t)N\u0007a\u0001\u0003[\n\u0011cY8ogVlW\rV8qS\u000e\u0014\u0015\r^2i)!\u0011IBa\u000e\u0003:\t\r\u0003cB\u0017\u0002\u0002\nm\u0011Q\u000e\t\u0005\u0005;\u0011\tD\u0004\u0003\u0003 \t=b\u0002\u0002B\u0011\u0005[qAAa\t\u0003,9!!Q\u0005B\u0015\u001d\r1&qE\u0005\u0002\u000b&\u00111\tR\u0005\u0003\u0003\nK!a\u0010!\n\u0007\u0005]d(\u0003\u0003\u00034\tU\"!\u0003#bi\u00064%/Y7f\u0015\r\t9H\u0010\u0005\b\u0003\u001f\\\u0002\u0019AA\u0005\u0011\u001d\u0011Yd\u0007a\u0001\u0005{\tqa]3tg&|g\u000eE\u0002>\u0005\u007fI1A!\u0011?\u00051\u0019\u0006/\u0019:l'\u0016\u001c8/[8o\u0011\u0019a6\u00041\u0001\u0002\u0016\u0005Y1/\u001b8l)>$v\u000e]5d)\u0019\tIE!\u0013\u0003L!1A\f\ba\u0001\u0003+AqA!\u0014\u001d\u0001\u0004\u0011Y\"\u0001\u0002eM\u0006Y1*\u00194lC\u000ec\u0017.\u001a8u!\tagd\u0005\u0002\u001fYQ\u0011!\u0011K\u0001\u0016G>t7/^7f)>\u0004\u0018nY*ue\u0016\fW.\u001b8h)\u0019\u0011YFa\u0018\u0003bQ!!1\u0004B/\u0011\u0015\u0019\u0007\u0005q\u0001e\u0011\u001d\u0011Y\u0004\ta\u0001\u0005{Aa\u0001\u0018\u0011A\u0002\u0005U\u0001")
public class KafkaClient
implements StrictLogging,
DatasetLogging,
AutoCloseable {
    private AdminClient client;
    private final Settings settings;
    private final Mode cometOffsetsMode;
    private final scala.collection.immutable.Map<String, String> serverOptions;
    private final Settings.KafkaTopicConfig cometOffsetsConfig;
    private final Properties serverOptionsProperties;
    private final Logger logger;
    private volatile boolean bitmap$0;

    public static Dataset<Row> consumeTopicStreaming(SparkSession sparkSession, Settings.KafkaTopicConfig kafkaTopicConfig, Settings settings) {
        return KafkaClient$.MODULE$.consumeTopicStreaming(sparkSession, kafkaTopicConfig, settings);
    }

    @Override
    public <T> DatasetLogging.DatasetHelper<T> DatasetHelper(Dataset<T> ds) {
        return DatasetLogging.DatasetHelper$(this, ds);
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger x$1) {
        this.logger = x$1;
    }

    public Mode cometOffsetsMode() {
        return this.cometOffsetsMode;
    }

    public scala.collection.immutable.Map<String, String> serverOptions() {
        return this.serverOptions;
    }

    public Settings.KafkaTopicConfig cometOffsetsConfig() {
        return this.cometOffsetsConfig;
    }

    public Properties serverOptionsProperties() {
        return this.serverOptionsProperties;
    }

    private AdminClient client$lzycompute() {
        KafkaClient kafkaClient = this;
        synchronized (kafkaClient) {
            if (!this.bitmap$0) {
                this.client = AdminClient.create((Properties)this.serverOptionsProperties());
                this.bitmap$0 = true;
            }
        }
        return this.client;
    }

    public AdminClient client() {
        if (!this.bitmap$0) {
            return this.client$lzycompute();
        }
        return this.client;
    }

    @Override
    public void close() {
        this.client().close();
    }

    public void deleteTopic(String topicName) {
        BoxedUnit boxedUnit;
        boolean found = ((Set)this.client().listTopics().names().get()).contains(topicName);
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info(((TraversableOnce)JavaConverters$.MODULE$.asScalaSetConverter((Set)this.client().listTopics().names().get()).asScala()).toSet().mkString("\n"));
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        if (found) {
            BoxedUnit boxedUnit2;
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Deleting topic {}", new Object[]{topicName});
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                boxedUnit2 = BoxedUnit.UNIT;
            }
            this.client().deleteTopics(JavaConverters$.MODULE$.asJavaCollectionConverter((Iterable)new .colon.colon((Object)topicName, (List)Nil$.MODULE$)).asJavaCollection());
            return;
        }
    }

    public void createTopicIfNotPresent(NewTopic topic, scala.collection.immutable.Map<String, String> conf) {
        boolean found = ((Set)this.client().listTopics().names().get()).contains(topic.name());
        if (!found) {
            this.client().createTopics(Collections.singleton(topic.configs((java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(conf).asJava()))).all().get();
            return;
        }
    }

    public List<Tuple2<Object, Object>> topicEndOffsets(String topicName, scala.collection.immutable.Map<String, String> accessOptions) {
        Try try_ = Try$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> {
            KafkaConsumer<String, String> consumer = this.newConsumer(topicName, accessOptions);
            List<TopicPartition> partitions = this.extractPartitions(topicName, consumer);
            consumer.assign((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter(partitions).asJava());
            consumer.seekToEnd((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter(partitions).asJava());
            List result = (List)partitions.map((Function1 & Serializable & scala.Serializable)p -> new Tuple2.mcIJ.sp(p.partition(), consumer.position(p)), List$.MODULE$.canBuildFrom());
            consumer.close();
            return result;
        });
        if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable e = failure.exception();
            e.printStackTrace();
            throw e;
        }
        if (try_ instanceof Success) {
            Success success = (Success)try_;
            List value = (List)success.value();
            return value;
        }
        throw new MatchError((Object)try_);
    }

    private List<TopicPartition> extractPartitions(String topicName, KafkaConsumer<String, String> consumer) {
        List partitions = ((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(consumer.partitionsFor(topicName)).asScala()).map((Function1 & Serializable & scala.Serializable)info -> new TopicPartition(topicName, info.partition()), Buffer$.MODULE$.canBuildFrom())).toList();
        return partitions;
    }

    private KafkaConsumer<String, String> newConsumer(String topicName, scala.collection.immutable.Map<String, String> accessOptions) {
        BoxedUnit boxedUnit;
        Properties props = this.buildProps(accessOptions);
        if (this.logger().underlying().isInfoEnabled()) {
            BoxedUnit boxedUnit2;
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("access options for topic {} ==>", new Object[]{topicName});
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                boxedUnit2 = BoxedUnit.UNIT;
            }
            ((IterableLike)JavaConverters$.MODULE$.propertiesAsScalaMapConverter(props).asScala()).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                KafkaClient.$anonfun$newConsumer$1(this, x0$1);
                return BoxedUnit.UNIT;
            });
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        KafkaConsumer consumer = new KafkaConsumer(props);
        return consumer;
    }

    private Properties buildProps(scala.collection.immutable.Map<String, String> accessOptions) {
        Properties props = new Properties();
        accessOptions.foreach((Function1 & Serializable & scala.Serializable)option -> props.put(option._1(), option._2()));
        return props;
    }

    public void topicSaveOffsets(String topicConfigName, scala.collection.immutable.Map<String, String> accessOptions, List<Tuple2<Object, Object>> offsets) {
        Mode mode = this.cometOffsetsMode();
        if (((Object)Mode$STREAM$.MODULE$).equals(mode)) {
            Properties props = this.buildProps(accessOptions);
            KafkaProducer producer = new KafkaProducer(props);
            offsets.foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                Tuple2 tuple2 = x0$1;
                if (tuple2 != null) {
                    int partition = tuple2._1$mcI$sp();
                    long offset = tuple2._2$mcJ$sp();
                    return producer.send(new ProducerRecord(this.cometOffsetsConfig().topicName(), (Object)new StringBuilder(1).append(topicConfigName).append("/").append(partition).toString(), (Object)String.valueOf(BoxesRunTime.boxToLong((long)offset))));
                }
                throw new MatchError((Object)tuple2);
            });
            producer.close();
            return;
        }
        if (((Object)Mode$FILE$.MODULE$).equals(mode)) {
            FileLock qual$1 = this.cometOffsetsLock(topicConfigName);
            long x$1 = qual$1.doExclusively$default$1();
            JFunction0.mcV.sp & Serializable & scala.Serializable x$5 = (JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                BoxedUnit boxedUnit;
                Path cometOffsetsPath = new Path(this.cometOffsetsConfig().topicName(), topicConfigName);
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Saving comet offsets to path {}", new Object[]{cometOffsetsPath});
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                String x$2 = YamlSerializer$.MODULE$.serializeObject(offsets.map((Function1 & Serializable & scala.Serializable)x0$2 -> {
                    Tuple2 tuple2 = x0$2;
                    if (tuple2 != null) {
                        int partition = tuple2._1$mcI$sp();
                        long offset = tuple2._2$mcJ$sp();
                        return new StringBuilder(1).append(Integer.toString(partition)).append(",").append(Long.toString(offset)).toString();
                    }
                    throw new MatchError((Object)tuple2);
                }, List$.MODULE$.canBuildFrom()));
                Path x$3 = cometOffsetsPath;
                Charset x$4 = $this.settings.storageHandler().write$default$3(x$2, x$3);
                $this.settings.storageHandler().write(x$2, x$3, x$4);
            };
            BoxedUnit cfr_ignored_0 = (BoxedUnit)qual$1.doExclusively(x$1, x$5);
            return;
        }
        throw new Exception("Should never happen");
    }

    public List<TopicPartitionInfo> adminTopicPartitions(String topicName) {
        return ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(((TopicDescription)((java.util.Map)this.client().describeTopics(Collections.singleton(topicName)).allTopicNames().get()).get(topicName)).partitions()).asScala()).toList();
    }

    private Option<List<Tuple2<Object, Object>>> topicCurrentOffsetsFromStream(String topicConfigName) {
        Properties props = new Properties();
        this.cometOffsetsConfig().allAccessOptions(this.settings).foreach((Function1 & Serializable & scala.Serializable)option -> props.put(option._1(), option._2()));
        KafkaConsumer consumer = new KafkaConsumer(props);
        List<TopicPartition> partitions = this.extractPartitions(this.cometOffsetsConfig().topicName(), (KafkaConsumer<String, String>)consumer);
        consumer.assign((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter(partitions).asJava());
        consumer.seekToBeginning((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter(partitions).asJava());
        scala.collection.mutable.Map offsets = Map$.MODULE$.empty();
        ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
        while (records != null && !records.isEmpty()) {
            ((IterableLike)JavaConverters$.MODULE$.iterableAsScalaIterableConverter(records.records(this.cometOffsetsConfig().topicName())).asScala()).foreach((Function1 & Serializable & scala.Serializable)r -> (scala.collection.mutable.Map)offsets.$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(r.key()), r.value())));
            records = consumer.poll(Duration.ofMillis(100L));
        }
        Option res = ((TraversableLike)offsets.keys().map((Function1 & Serializable & scala.Serializable)k -> {
            String[] tab = new StringOps(Predef$.MODULE$.augmentString(k)).split('/');
            return new Tuple3((Object)tab[0], (Object)tab[1], offsets.apply(k));
        }, Iterable$.MODULE$.canBuildFrom())).groupBy((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple3 tuple3 = x0$1;
            if (tuple3 != null) {
                String topic = (String)tuple3._1();
                return topic;
            }
            throw new MatchError((Object)tuple3);
        }).mapValues((Function1 & Serializable & scala.Serializable)x$1 -> ((TraversableOnce)x$1.map((Function1 & Serializable & scala.Serializable)x0$2 -> {
            Tuple3 tuple3 = x0$2;
            if (tuple3 != null) {
                String partition = (String)tuple3._2();
                String offset = (String)tuple3._3();
                return new Tuple2.mcIJ.sp(new StringOps(Predef$.MODULE$.augmentString(partition)).toInt(), new StringOps(Predef$.MODULE$.augmentString(offset)).toLong());
            }
            throw new MatchError((Object)tuple3);
        }, Iterable$.MODULE$.canBuildFrom())).toList()).get((Object)topicConfigName);
        return res;
    }

    private FileLock cometOffsetsLock(String topicConfigName) {
        Path lockPath = new Path(this.settings.comet().lock().path(), new StringBuilder(19).append("comet_offsets_").append(topicConfigName).append(".lock").toString());
        return new FileLock(lockPath, this.settings.storageHandler());
    }

    private Option<List<Tuple2<Object, Object>>> topicCurrentOffsetsFromFile(String topicConfigName) {
        FileLock qual$1 = this.cometOffsetsLock(topicConfigName);
        long x$1 = qual$1.doExclusively$default$1();
        Function0 & Serializable & scala.Serializable x$2 = (Function0 & Serializable & scala.Serializable)() -> {
            BoxedUnit boxedUnit;
            Path cometOffsetsPath = new Path(this.cometOffsetsConfig().topicName(), topicConfigName);
            if ($this.settings.storageHandler().exists(cometOffsetsPath)) {
                BoxedUnit boxedUnit2;
                if (this.logger().underlying().isInfoEnabled()) {
                    this.logger().underlying().info("Loading comet offsets to path {}", new Object[]{cometOffsetsPath});
                    boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    boxedUnit2 = BoxedUnit.UNIT;
                }
                List res = (List)((List)YamlSerializer$.MODULE$.mapper().readValue($this.settings.storageHandler().read(cometOffsetsPath, $this.settings.storageHandler().read$default$2()), List.class)).map((Function1 & Serializable & scala.Serializable)str -> {
                    String[] tab = new StringOps(Predef$.MODULE$.augmentString(str)).split(',');
                    return new Tuple2.mcIJ.sp(new StringOps(Predef$.MODULE$.augmentString(tab[0])).toInt(), new StringOps(Predef$.MODULE$.augmentString(tab[1])).toLong());
                }, List$.MODULE$.canBuildFrom());
                return new Some((Object)res);
            }
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Cannot load comet offsets: {} file does not exist", new Object[]{cometOffsetsPath});
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            return None$.MODULE$;
        };
        return (Option)qual$1.doExclusively(x$1, x$2);
    }

    public Option<List<Tuple2<Object, Object>>> topicCurrentOffsets(String topicConfigName) {
        Mode mode = this.cometOffsetsMode();
        if (((Object)Mode$STREAM$.MODULE$).equals(mode)) {
            return this.topicCurrentOffsetsFromStream(topicConfigName);
        }
        if (((Object)Mode$FILE$.MODULE$).equals(mode)) {
            return this.topicCurrentOffsetsFromFile(topicConfigName);
        }
        throw new Exception("Should never happen");
    }

    public Option<String> offsetsAsJson(String topicName, List<Tuple2<Object, Object>> offsets) {
        if (offsets.isEmpty()) {
            return None$.MODULE$;
        }
        String offsetsAsString = ((TraversableOnce)offsets.map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 != null) {
                int partition = tuple2._1$mcI$sp();
                long partitionOffset = tuple2._2$mcJ$sp();
                return new StringBuilder(4).append("\"").append(partition).append("\": ").append(partitionOffset).toString();
            }
            throw new MatchError((Object)tuple2);
        }, List$.MODULE$.canBuildFrom())).mkString(",");
        return new Some((Object)new StringBuilder(7).append("{\"").append(topicName).append("\":{").append(offsetsAsString).append("}}").toString());
    }

    public Tuple2<Dataset<Row>, List<Tuple2<Object, Object>>> consumeTopicBatch(String topicConfigName, SparkSession session, Settings.KafkaTopicConfig config) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        BoxedUnit boxedUnit3;
        BoxedUnit boxedUnit4;
        BoxedUnit boxedUnit5;
        long EARLIEST_OFFSET = -2L;
        List startOffsets = (List)this.topicCurrentOffsets(topicConfigName).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            KafkaConsumer<String, String> consumer = this.newConsumer(config.topicName(), config.allAccessOptions($this.settings));
            List partitions = (List)this.extractPartitions(config.topicName(), consumer).map((Function1 & Serializable & scala.Serializable)p -> new Tuple2.mcIJ.sp(p.partition(), EARLIEST_OFFSET), List$.MODULE$.canBuildFrom());
            consumer.close();
            return partitions;
        });
        if (this.logger().underlying().isInfoEnabled()) {
            startOffsets.foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                KafkaClient.$anonfun$consumeTopicBatch$3(this, topicConfigName, x0$1);
                return BoxedUnit.UNIT;
            });
            boxedUnit5 = BoxedUnit.UNIT;
        } else {
            boxedUnit5 = BoxedUnit.UNIT;
        }
        List<Tuple2<Object, Object>> endOffsets = this.topicEndOffsets(config.topicName(), config.allAccessOptions(this.settings));
        if (this.logger().underlying().isInfoEnabled()) {
            endOffsets.foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
                KafkaClient.$anonfun$consumeTopicBatch$4(this, topicConfigName, x0$2);
                return BoxedUnit.UNIT;
            });
            boxedUnit4 = BoxedUnit.UNIT;
        } else {
            boxedUnit4 = BoxedUnit.UNIT;
        }
        scala.collection.immutable.Map withOffsetsTopicOptions = config.allAccessOptions(this.settings).$plus$plus((GenTraversableOnce)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"startingOffsets"), this.offsetsAsJson(config.topicName(), (List<Tuple2<Object, Object>>)startOffsets).getOrElse((Function0 & Serializable & scala.Serializable)() -> "earliest")), (List)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"endingOffsets"), this.offsetsAsJson(config.topicName(), endOffsets).getOrElse((Function0 & Serializable & scala.Serializable)() -> "latest")), (List)Nil$.MODULE$)));
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info(new StringBuilder(24).append("withOffsetsTopicOptions:").append(withOffsetsTopicOptions.toString()).toString());
            boxedUnit3 = BoxedUnit.UNIT;
        } else {
            boxedUnit3 = BoxedUnit.UNIT;
        }
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info(new StringBuilder(40).append("settings.comet.kafka.sparkServerOptions:").append(this.settings.comet().kafka().sparkServerOptions().toString()).toString());
            boxedUnit2 = BoxedUnit.UNIT;
        } else {
            boxedUnit2 = BoxedUnit.UNIT;
        }
        DataFrameReader reader = session.read().format("kafka");
        Dataset df = reader.options((Map)withOffsetsTopicOptions).options(this.settings.comet().kafka().sparkServerOptions()).load().selectExpr(config.fields());
        if (this.logger().underlying().isInfoEnabled()) {
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info(this.DatasetHelper(df).schemaString());
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        return new Tuple2((Object)df, endOffsets);
    }

    public void sinkToTopic(Settings.KafkaTopicConfig config, Dataset<Row> df) {
        df.printSchema();
        df.write().format("kafka").options(config.allAccessOptions(this.settings)).option("topic", config.topicName()).save();
    }

    public static final /* synthetic */ void $anonfun$newConsumer$1(KafkaClient $this, Tuple2 x0$1) {
        Tuple2 tuple2 = x0$1;
        if (tuple2 != null) {
            String k = (String)tuple2._1();
            String v = (String)tuple2._2();
            if ($this.logger().underlying().isInfoEnabled()) {
                $this.logger().underlying().info("\t{}={}", new Object[]{k, v});
                return;
            }
            return;
        }
        throw new MatchError((Object)tuple2);
    }

    public static final /* synthetic */ void $anonfun$consumeTopicBatch$3(KafkaClient $this, String topicConfigName$3, Tuple2 x0$1) {
        Tuple2 tuple2 = x0$1;
        if (tuple2 != null) {
            int partition = tuple2._1$mcI$sp();
            long offsetStart = tuple2._2$mcJ$sp();
            if ($this.logger().underlying().isInfoEnabled()) {
                $this.logger().underlying().info("{} start-offset -> {}:{}", new Object[]{topicConfigName$3, BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)offsetStart)});
                return;
            }
            return;
        }
        throw new MatchError((Object)tuple2);
    }

    public static final /* synthetic */ void $anonfun$consumeTopicBatch$4(KafkaClient $this, String topicConfigName$3, Tuple2 x0$2) {
        Tuple2 tuple2 = x0$2;
        if (tuple2 != null) {
            int partition = tuple2._1$mcI$sp();
            long offsetEnd = tuple2._2$mcJ$sp();
            if ($this.logger().underlying().isInfoEnabled()) {
                $this.logger().underlying().info("{} end-offset -> {}:{}", new Object[]{topicConfigName$3, BoxesRunTime.boxToInteger((int)partition), BoxesRunTime.boxToLong((long)offsetEnd)});
                return;
            }
            return;
        }
        throw new MatchError((Object)tuple2);
    }

    public KafkaClient(Settings.KafkaConfig kafkaConfig, Settings settings) {
        this.settings = settings;
        StrictLogging.$init$((StrictLogging)this);
        DatasetLogging.$init$(this);
        this.cometOffsetsMode = (Mode)settings.comet().kafka().cometOffsetsMode().map((Function1 & Serializable & scala.Serializable)value -> Mode$.MODULE$.fromString((String)value)).getOrElse((Function0 & Serializable & scala.Serializable)() -> Mode$STREAM$.MODULE$);
        this.serverOptions = kafkaConfig.serverOptions();
        this.cometOffsetsConfig = (Settings.KafkaTopicConfig)kafkaConfig.topics().apply((Object)"comet_offsets");
        this.serverOptionsProperties = new Properties();
        this.serverOptions().foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 != null) {
                String k = (String)tuple2._1();
                String v = (String)tuple2._2();
                return this.serverOptionsProperties().put(k, v);
            }
            throw new MatchError((Object)tuple2);
        });
        Mode mode = this.cometOffsetsMode();
        if (((Object)Mode$STREAM$.MODULE$).equals(mode)) {
            this.createTopicIfNotPresent(new NewTopic(this.cometOffsetsConfig().topicName(), this.cometOffsetsConfig().partitions(), this.cometOffsetsConfig().replicationFactor()), (scala.collection.immutable.Map<String, String>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"cleanup.policy"), (Object)"compact")}))));
        } else if (((Object)Mode$FILE$.MODULE$).equals(mode)) {
            if (!settings.storageHandler().exists(new Path(this.cometOffsetsConfig().topicName()))) {
                BoxesRunTime.boxToBoolean((boolean)settings.storageHandler().mkdirs(new Path(this.cometOffsetsConfig().topicName())));
            }
        } else {
            throw new Exception("Should never happen");
        }
    }
}

