/*
 * Decompiled with CFR 0.152.
 */
package ai.tripl.arc.extract;

import ai.tripl.arc.api.API;
import ai.tripl.arc.extract.KafkaExtract;
import ai.tripl.arc.extract.KafkaExtractStage;
import ai.tripl.arc.extract.KafkaExtractStage$;
import ai.tripl.arc.util.DetailException;
import ai.tripl.arc.util.log.logger.Logger;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.TaskContext$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.Tuple14;
import scala.collection.Iterable$;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

public final class KafkaExtractStage$
implements scala.Serializable {
    public static KafkaExtractStage$ MODULE$;

    static {
        new KafkaExtractStage$();
    }

    public Option<Dataset<Row>> execute(KafkaExtractStage stage, SparkSession spark, Logger logger, API.ARCContext arcContext) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        Dataset dataset;
        Dataset dataset2;
        if (arcContext.isStreaming()) {
            dataset2 = spark.readStream().format("kafka").option("kafka.bootstrap.servers", stage.bootstrapServers()).option("subscribe", stage.topic()).load();
        } else {
            Properties commonProps = new Properties();
            commonProps.put("bootstrap.servers", stage.bootstrapServers());
            commonProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            commonProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            commonProps.put("enable.auto.commit", "false");
            commonProps.put("auto.offset.reset", "earliest");
            commonProps.put("request.timeout.ms", ((Object)BoxesRunTime.boxToLong((long)stage.timeout())).toString());
            commonProps.put("session.timeout.ms", ((Object)BoxesRunTime.boxToLong((long)Math.min(10000L, stage.timeout() - 1L))).toString());
            commonProps.put("fetch.max.wait.ms", ((Object)BoxesRunTime.boxToLong((long)Math.min(500L, stage.timeout() - 1L))).toString());
            commonProps.put("heartbeat.interval.ms", ((Object)BoxesRunTime.boxToLong((long)Math.min(3000L, stage.timeout() - 2L))).toString());
            Properties props = new Properties();
            props.putAll((Map<?, ?>)commonProps);
            props.put("group.id", stage.groupID());
            int numPartitions = KafkaExtractStage$.liftedTree1$1(props, stage);
            int stageMaxPollRecords = stage.maxPollRecords();
            String stageGroupID = stage.groupID();
            String stageTopic = stage.topic();
            long stageTimeout = stage.timeout();
            boolean stageAutoCommit = stage.autoCommit();
            try {
                public final class Ai_tripl_arc_extract_KafkaExtractStage$$typecreator4$1
                extends TypeCreator {
                    public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                        Universe $u = $m$untyped.universe();
                        Mirror<U> $m = $m$untyped;
                        return $m.staticClass("ai.tripl.arc.extract.KafkaExtractStage.KafkaRecord").asType().toTypeConstructor();
                    }

                    public Ai_tripl_arc_extract_KafkaExtractStage$$typecreator4$1() {
                    }
                }
                JavaUniverse $u = package$.MODULE$.universe();
                JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
                dataset2 = spark.sqlContext().emptyDataFrame().repartition(numPartitions).mapPartitions((Function1 & Serializable & scala.Serializable)partition -> {
                    Iterator iterator;
                    int partitionId = TaskContext$.MODULE$.getPartitionId();
                    Properties props = new Properties();
                    props.putAll((Map<?, ?>)commonProps);
                    props.put("max.poll.records", ((Object)BoxesRunTime.boxToInteger((int)stageMaxPollRecords)).toString());
                    props.put("group.id", new StringBuilder(1).append(stageGroupID).append("-").append(partitionId).toString());
                    TopicPartition topicPartition = new TopicPartition(stageTopic, partitionId);
                    try (KafkaConsumer kafkaConsumer = new KafkaConsumer(props);){
                        kafkaConsumer.assign((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)topicPartition, (List)Nil$.MODULE$)).asJava());
                        List dataset = this.getAllKafkaRecords$1(KafkaExtractStage$.getKafkaRecord$1(kafkaConsumer, stageTimeout, stageTopic), (List)Nil$.MODULE$, kafkaConsumer, stageTimeout, stageTopic);
                        if (stageAutoCommit) {
                            kafkaConsumer.commitSync();
                        }
                        iterator = dataset.toIterator();
                    }
                    return iterator;
                }, spark.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Ai_tripl_arc_extract_KafkaExtractStage$$typecreator4$1()))).toDF();
            }
            catch (Exception e) {
                throw new DetailException(e, stage){
                    private final scala.collection.mutable.Map<String, Object> detail;

                    public scala.collection.mutable.Map<String, Object> detail() {
                        return this.detail;
                    }
                    {
                        this.detail = stage$1.stageDetail();
                    }
                };
            }
        }
        Dataset df = dataset2;
        List<String> list = stage.partitionBy();
        if (Nil$.MODULE$.equals(list)) {
            Dataset dataset3;
            Option<Object> option = stage.numPartitions();
            if (option instanceof Some) {
                Some some = (Some)option;
                int numPartitions = BoxesRunTime.unboxToInt((Object)some.value());
                dataset3 = df.repartition(numPartitions);
            } else if (None$.MODULE$.equals(option)) {
                dataset3 = df;
            } else {
                throw new MatchError(option);
            }
            dataset = dataset3;
        } else {
            Dataset dataset4;
            List partitionCols = (List)list.map((Function1 & Serializable & scala.Serializable)col -> df.apply(col), List$.MODULE$.canBuildFrom());
            Option<Object> option = stage.numPartitions();
            if (option instanceof Some) {
                Some some = (Some)option;
                int numPartitions = BoxesRunTime.unboxToInt((Object)some.value());
                dataset4 = df.repartition(numPartitions, (Seq)partitionCols);
            } else if (None$.MODULE$.equals(option)) {
                dataset4 = df.repartition((Seq)partitionCols);
            } else {
                throw new MatchError(option);
            }
            dataset = dataset4;
        }
        Dataset repartitionedDF = dataset;
        if (arcContext.immutableViews()) {
            repartitionedDF.createTempView(stage.outputView());
        } else {
            repartitionedDF.createOrReplaceTempView(stage.outputView());
        }
        if (!repartitionedDF.isStreaming()) {
            stage.stageDetail().put((Object)"outputColumns", (Object)repartitionedDF.schema().length());
            boxedUnit2 = stage.stageDetail().put((Object)"numPartitions", (Object)repartitionedDF.rdd().partitions().length);
        } else {
            boxedUnit2 = BoxedUnit.UNIT;
        }
        if (!(!stage.persist() && stage.autoCommit() || repartitionedDF.isStreaming())) {
            repartitionedDF.persist(StorageLevel$.MODULE$.MEMORY_AND_DISK_SER());
            boxedUnit = stage.stageDetail().put((Object)"records", (Object)repartitionedDF.count());
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        return Option$.MODULE$.apply((Object)repartitionedDF);
    }

    public KafkaExtractStage apply(KafkaExtract plugin, String name, Option<String> description, String outputView, String topic, String bootstrapServers, String groupID, int maxPollRecords, long timeout, boolean autoCommit, scala.collection.immutable.Map<String, String> params, boolean persist, Option<Object> numPartitions, List<String> partitionBy) {
        return new KafkaExtractStage(plugin, name, description, outputView, topic, bootstrapServers, groupID, maxPollRecords, timeout, autoCommit, params, persist, numPartitions, partitionBy);
    }

    public Option<Tuple14<KafkaExtract, String, Option<String>, String, String, String, String, Object, Object, Object, scala.collection.immutable.Map<String, String>, Object, Option<Object>, List<String>>> unapply(KafkaExtractStage x$0) {
        return x$0 == null ? None$.MODULE$ : new Some((Object)new Tuple14((Object)x$0.plugin(), (Object)x$0.name(), x$0.description(), (Object)x$0.outputView(), (Object)x$0.topic(), (Object)x$0.bootstrapServers(), (Object)x$0.groupID(), (Object)BoxesRunTime.boxToInteger((int)x$0.maxPollRecords()), (Object)BoxesRunTime.boxToLong((long)x$0.timeout()), (Object)BoxesRunTime.boxToBoolean((boolean)x$0.autoCommit()), x$0.params(), (Object)BoxesRunTime.boxToBoolean((boolean)x$0.persist()), x$0.numPartitions(), x$0.partitionBy()));
    }

    private Object readResolve() {
        return MODULE$;
    }

    private static final int liftedTree1$1(Properties props$1, KafkaExtractStage stage$1) {
        int n;
        try (KafkaConsumer kafkaDriverConsumer = new KafkaConsumer(props$1);){
            n = kafkaDriverConsumer.partitionsFor(stage$1.topic()).size();
        }
        catch (Exception e) {
            throw new DetailException(e, stage$1){
                private final scala.collection.mutable.Map<String, Object> detail;

                public scala.collection.mutable.Map<String, Object> detail() {
                    return this.detail;
                }
                {
                    this.detail = stage$1.stageDetail();
                }
            };
        }
        return n;
    }

    private static final List getKafkaRecord$1(KafkaConsumer kafkaConsumer$1, long stageTimeout$1, String stageTopic$1) {
        return ((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.iterableAsScalaIterableConverter(kafkaConsumer$1.poll(Duration.ofMillis(stageTimeout$1)).records(stageTopic$1)).asScala()).map((Function1 & Serializable & scala.Serializable)consumerRecord -> new KafkaExtractStage.KafkaRecord(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp(), (byte[])consumerRecord.key(), (byte[])consumerRecord.value()), Iterable$.MODULE$.canBuildFrom())).toList();
    }

    private final List getAllKafkaRecords$1(List kafkaRecords, List kafkaRecordsAccumulator, KafkaConsumer kafkaConsumer$1, long stageTimeout$1, String stageTopic$1) {
        while (true) {
            List list;
            if (Nil$.MODULE$.equals(list = kafkaRecords)) break;
            List list2 = kafkaRecordsAccumulator;
            kafkaRecordsAccumulator = kafkaRecords.$colon$colon$colon(list2);
            kafkaRecords = KafkaExtractStage$.getKafkaRecord$1(kafkaConsumer$1, stageTimeout$1, stageTopic$1);
        }
        List list = kafkaRecordsAccumulator;
        return list;
    }

    private KafkaExtractStage$() {
        MODULE$ = this;
    }
}

