/*
 * Decompiled with CFR 0.152.
 */
package ai.chronon.spark.streaming;

import ai.chronon.online.DataStream;
import ai.chronon.online.StreamBuilder;
import ai.chronon.online.TopicInfo;
import ai.chronon.spark.streaming.TopicChecker$;
import java.io.Serializable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function1;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.runtime.ScalaRunTime$;

public final class KafkaStreamBuilder$
implements StreamBuilder {
    public static final KafkaStreamBuilder$ MODULE$ = new KafkaStreamBuilder$();
    private static transient Logger logger;
    private static volatile transient boolean bitmap$trans$0;

    private Logger logger$lzycompute() {
        KafkaStreamBuilder$ kafkaStreamBuilder$ = this;
        synchronized (kafkaStreamBuilder$) {
            if (!bitmap$trans$0) {
                logger = LoggerFactory.getLogger(this.getClass());
                bitmap$trans$0 = true;
            }
        }
        return logger;
    }

    public Logger logger() {
        return !bitmap$trans$0 ? this.logger$lzycompute() : logger;
    }

    public DataStream from(TopicInfo topicInfo, SparkSession session, Map<String, String> conf) {
        Map conf2 = topicInfo.params();
        String bootstrap = (String)conf2.getOrElse((Object)"bootstrap", (Function0 & Serializable)() -> new StringBuilder(0).append((String)conf2.apply((Object)"host")).append(conf2.get((Object)"port").map((Function1 & Serializable)x$1 -> new StringBuilder(1).append(":").append((String)x$1).toString()).getOrElse((Function0 & Serializable)() -> "")).toString());
        TopicChecker$.MODULE$.topicShouldExist(topicInfo.name(), bootstrap);
        session.streams().addListener(new StreamingQueryListener(){

            public void onQueryStarted(StreamingQueryListener.QueryStartedEvent queryStarted) {
                KafkaStreamBuilder$.MODULE$.logger().info(new StringBuilder(15).append("Query started: ").append(queryStarted.id()).toString());
            }

            public void onQueryTerminated(StreamingQueryListener.QueryTerminatedEvent queryTerminated) {
                KafkaStreamBuilder$.MODULE$.logger().info(new StringBuilder(18).append("Query terminated: ").append(queryTerminated.id()).toString());
            }

            public void onQueryProgress(StreamingQueryListener.QueryProgressEvent queryProgress) {
                KafkaStreamBuilder$.MODULE$.logger().info(new StringBuilder(21).append("Query made progress: ").append(queryProgress.progress()).toString());
            }
        });
        Dataset df = session.readStream().format("kafka").option("kafka.bootstrap.servers", bootstrap).option("subscribe", topicInfo.name()).option("enable.auto.commit", "true").load().selectExpr((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"value"}));
        return new DataStream(df, TopicChecker$.MODULE$.getPartitions(topicInfo.name(), bootstrap), topicInfo);
    }

    private KafkaStreamBuilder$() {
    }
}

