/*
 * Decompiled with CFR 0.152.
 */
package ai.tripl.arc.extract;

import ai.tripl.arc.api.API;
import ai.tripl.arc.extract.KafkaExtract;
import ai.tripl.arc.extract.KafkaExtractStage;
import ai.tripl.arc.extract.KafkaExtractStage$;
import ai.tripl.arc.util.DetailException;
import ai.tripl.arc.util.log.logger.Logger;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.TaskContext$;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple14;
import scala.collection.Iterable$;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

public final class KafkaExtractStage$
implements Serializable {
    public static final KafkaExtractStage$ MODULE$;

    static {
        new KafkaExtractStage$();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public Option<Dataset<Row>> execute(KafkaExtractStage stage, SparkSession spark, Logger logger, API.ARCContext arcContext) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        Dataset dataset;
        Dataset dataset2;
        if (arcContext.isStreaming()) {
            dataset2 = spark.readStream().format("kafka").option("kafka.bootstrap.servers", stage.bootstrapServers()).option("subscribe", stage.topic()).load();
        } else {
            Properties commonProps = new Properties();
            commonProps.put("bootstrap.servers", stage.bootstrapServers());
            commonProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            commonProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            commonProps.put("enable.auto.commit", "false");
            commonProps.put("auto.offset.reset", "earliest");
            commonProps.put("request.timeout.ms", ((Object)BoxesRunTime.boxToLong((long)stage.timeout())).toString());
            commonProps.put("session.timeout.ms", ((Object)BoxesRunTime.boxToLong((long)Math.min(10000L, stage.timeout() - 1L))).toString());
            commonProps.put("fetch.max.wait.ms", ((Object)BoxesRunTime.boxToLong((long)Math.min(500L, stage.timeout() - 1L))).toString());
            commonProps.put("heartbeat.interval.ms", ((Object)BoxesRunTime.boxToLong((long)Math.min(3000L, stage.timeout() - 2L))).toString());
            Properties props = new Properties();
            props.putAll((Map<?, ?>)commonProps);
            props.put("group.id", stage.groupID());
            int numPartitions = this.liftedTree1$1(stage, props);
            int stageMaxPollRecords = stage.maxPollRecords();
            String stageGroupID = stage.groupID();
            String stageTopic = stage.topic();
            long stageTimeout = stage.timeout();
            boolean stageAutoCommit = stage.autoCommit();
            try {
                JavaUniverse $u = package$.MODULE$.universe();
                JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
                public final class Ai_tripl_arc_extract_KafkaExtractStage$$typecreator4$1
                extends TypeCreator {
                    public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                        Universe $u = $m$untyped.universe();
                        Mirror<U> $m = $m$untyped;
                        return $m.staticClass("ai.tripl.arc.extract.KafkaExtractStage.KafkaRecord").asType().toTypeConstructor();
                    }

                    public Ai_tripl_arc_extract_KafkaExtractStage$$typecreator4$1() {
                    }
                }
                dataset2 = spark.sqlContext().emptyDataFrame().repartition(numPartitions).mapPartitions((Function1)new Serializable(commonProps, stageMaxPollRecords, stageGroupID, stageTopic, stageTimeout, stageAutoCommit){
                    public static final long serialVersionUID = 0L;
                    private final Properties commonProps$1;
                    private final int stageMaxPollRecords$1;
                    private final String stageGroupID$1;
                    private final String stageTopic$1;
                    private final long stageTimeout$1;
                    private final boolean stageAutoCommit$1;

                    /*
                     * WARNING - void declaration
                     */
                    public final Iterator<KafkaExtractStage.KafkaRecord> apply(Iterator<Row> partition) {
                        Iterator iterator;
                        int partitionId = TaskContext$.MODULE$.getPartitionId();
                        Properties props = new Properties();
                        props.putAll((Map<?, ?>)this.commonProps$1);
                        props.put("max.poll.records", ((Object)BoxesRunTime.boxToInteger((int)this.stageMaxPollRecords$1)).toString());
                        props.put("group.id", new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "-", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.stageGroupID$1, BoxesRunTime.boxToInteger((int)partitionId)})));
                        KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
                        TopicPartition topicPartition = new TopicPartition(this.stageTopic$1, partitionId);
                        try {
                            kafkaConsumer.assign((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{topicPartition}))).asJava());
                            List dataset = this.getAllKafkaRecords$1(this.getKafkaRecord$1(kafkaConsumer), (List)Nil$.MODULE$, kafkaConsumer);
                            if (this.stageAutoCommit$1) {
                                kafkaConsumer.commitSync();
                            }
                            iterator = dataset.toIterator();
                        }
                        catch (Throwable throwable) {
                            void var4_4;
                            var4_4.close();
                            throw throwable;
                        }
                        kafkaConsumer.close();
                        return iterator;
                    }

                    private final List getKafkaRecord$1(KafkaConsumer kafkaConsumer$1) {
                        return ((TraversableOnce)((TraversableLike)JavaConverters$.MODULE$.iterableAsScalaIterableConverter(kafkaConsumer$1.poll(Duration.ofMillis(this.stageTimeout$1)).records(this.stageTopic$1)).asScala()).map((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final KafkaExtractStage.KafkaRecord apply(ConsumerRecord<byte[], byte[]> consumerRecord) {
                                return new KafkaExtractStage.KafkaRecord(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp(), (byte[])consumerRecord.key(), (byte[])consumerRecord.value());
                            }
                        }, Iterable$.MODULE$.canBuildFrom())).toList();
                    }

                    private final List getAllKafkaRecords$1(List kafkaRecords, List kafkaRecordsAccumulator, KafkaConsumer kafkaConsumer$1) {
                        while (true) {
                            List list;
                            if (Nil$.MODULE$.equals(list = kafkaRecords)) {
                                List list2 = kafkaRecordsAccumulator;
                                return list2;
                            }
                            List list3 = kafkaRecordsAccumulator;
                            kafkaRecordsAccumulator = kafkaRecords.$colon$colon$colon(list3);
                            kafkaRecords = this.getKafkaRecord$1(kafkaConsumer$1);
                        }
                    }
                    {
                        this.commonProps$1 = commonProps$1;
                        this.stageMaxPollRecords$1 = stageMaxPollRecords$1;
                        this.stageGroupID$1 = stageGroupID$1;
                        this.stageTopic$1 = stageTopic$1;
                        this.stageTimeout$1 = stageTimeout$1;
                        this.stageAutoCommit$1 = stageAutoCommit$1;
                    }
                }, spark.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Ai_tripl_arc_extract_KafkaExtractStage$$typecreator4$1()))).toDF();
            }
            catch (Exception exception) {
                throw new DetailException(stage, exception){
                    private final scala.collection.mutable.Map<String, Object> detail;

                    public scala.collection.mutable.Map<String, Object> detail() {
                        return this.detail;
                    }
                    {
                        this.detail = stage$1.stageDetail();
                    }
                };
            }
        }
        Dataset df = dataset2;
        List<String> list = stage.partitionBy();
        if (Nil$.MODULE$.equals(list)) {
            Dataset dataset3;
            Option<Object> option2 = stage.numPartitions();
            if (option2 instanceof Some) {
                Some some = (Some)option2;
                int numPartitions = BoxesRunTime.unboxToInt((Object)some.x());
                dataset3 = df.repartition(numPartitions);
            } else {
                if (!None$.MODULE$.equals(option2)) throw new MatchError(option2);
                dataset3 = df;
            }
            dataset = dataset3;
        } else {
            Dataset dataset22;
            List partitionCols = (List)list.map((Function1)new Serializable(df){
                public static final long serialVersionUID = 0L;
                private final Dataset df$1;

                public final Column apply(String col) {
                    return this.df$1.apply(col);
                }
                {
                    this.df$1 = df$1;
                }
            }, List$.MODULE$.canBuildFrom());
            Option<Object> option = stage.numPartitions();
            if (option instanceof Some) {
                Some some = (Some)option;
                int numPartitions = BoxesRunTime.unboxToInt((Object)some.x());
                dataset22 = df.repartition(numPartitions, (Seq)partitionCols);
            } else {
                if (!None$.MODULE$.equals(option)) throw new MatchError(option);
                dataset22 = df.repartition((Seq)partitionCols);
            }
            dataset = dataset22;
        }
        Dataset repartitionedDF = dataset;
        if (arcContext.immutableViews()) {
            repartitionedDF.createTempView(stage.outputView());
        } else {
            repartitionedDF.createOrReplaceTempView(stage.outputView());
        }
        if (repartitionedDF.isStreaming()) {
            boxedUnit2 = BoxedUnit.UNIT;
        } else {
            stage.stageDetail().put((Object)"outputColumns", (Object)repartitionedDF.schema().length());
            boxedUnit2 = stage.stageDetail().put((Object)"numPartitions", (Object)repartitionedDF.rdd().partitions().length);
        }
        if (!(!stage.persist() && stage.autoCommit() || repartitionedDF.isStreaming())) {
            repartitionedDF.persist(StorageLevel$.MODULE$.MEMORY_AND_DISK_SER());
            boxedUnit = stage.stageDetail().put((Object)"records", (Object)repartitionedDF.count());
            return Option$.MODULE$.apply((Object)repartitionedDF);
        }
        boxedUnit = BoxedUnit.UNIT;
        return Option$.MODULE$.apply((Object)repartitionedDF);
    }

    public KafkaExtractStage apply(KafkaExtract plugin, String name, Option<String> description, String outputView, String topic, String bootstrapServers, String groupID, int maxPollRecords, long timeout, boolean autoCommit, scala.collection.immutable.Map<String, String> params, boolean persist, Option<Object> numPartitions, List<String> partitionBy) {
        return new KafkaExtractStage(plugin, name, description, outputView, topic, bootstrapServers, groupID, maxPollRecords, timeout, autoCommit, params, persist, numPartitions, partitionBy);
    }

    public Option<Tuple14<KafkaExtract, String, Option<String>, String, String, String, String, Object, Object, Object, scala.collection.immutable.Map<String, String>, Object, Option<Object>, List<String>>> unapply(KafkaExtractStage x$0) {
        return x$0 == null ? None$.MODULE$ : new Some((Object)new Tuple14((Object)x$0.plugin(), (Object)x$0.name(), x$0.description(), (Object)x$0.outputView(), (Object)x$0.topic(), (Object)x$0.bootstrapServers(), (Object)x$0.groupID(), (Object)BoxesRunTime.boxToInteger((int)x$0.maxPollRecords()), (Object)BoxesRunTime.boxToLong((long)x$0.timeout()), (Object)BoxesRunTime.boxToBoolean((boolean)x$0.autoCommit()), x$0.params(), (Object)BoxesRunTime.boxToBoolean((boolean)x$0.persist()), x$0.numPartitions(), x$0.partitionBy()));
    }

    private Object readResolve() {
        return MODULE$;
    }

    /*
     * WARNING - void declaration
     */
    private final int liftedTree1$1(KafkaExtractStage stage$1, Properties props$1) {
        try {
            int n;
            KafkaConsumer kafkaDriverConsumer = new KafkaConsumer(props$1);
            try {
                n = kafkaDriverConsumer.partitionsFor(stage$1.topic()).size();
            }
            catch (Throwable throwable) {
                void var4_3;
                var4_3.close();
                throw throwable;
            }
            kafkaDriverConsumer.close();
            return n;
        }
        catch (Exception exception) {
            throw new DetailException(stage$1, exception){
                private final scala.collection.mutable.Map<String, Object> detail;

                public scala.collection.mutable.Map<String, Object> detail() {
                    return this.detail;
                }
                {
                    this.detail = stage$1.stageDetail();
                }
            };
        }
    }

    private KafkaExtractStage$() {
        MODULE$ = this;
    }
}

