/*
 * Decompiled with CFR 0.152.
 */
package ai.tripl.arc.extract;

import ai.tripl.arc.api.API;
import ai.tripl.arc.extract.KafkaExtract;
import ai.tripl.arc.extract.KafkaExtractStage;
import ai.tripl.arc.extract.KafkaExtractStage$;
import ai.tripl.arc.extract.KafkaPartition;
import ai.tripl.arc.util.DetailException;
import ai.tripl.arc.util.log.logger.Logger;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.TaskContext$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.CollectionAccumulator;
import org.apache.spark.util.LongAccumulator;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.Tuple15;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

public final class KafkaExtractStage$
implements scala.Serializable {
    public static KafkaExtractStage$ MODULE$;

    static {
        new KafkaExtractStage$();
    }

    public Option<Dataset<Row>> execute(KafkaExtractStage stage, SparkSession spark, Logger logger, API.ARCContext arcContext) {
        BoxedUnit boxedUnit;
        Dataset dataset;
        Dataset dataset2;
        LongAccumulator recordAccumulator = spark.sparkContext().longAccumulator();
        LongAccumulator bytesAccumulator = spark.sparkContext().longAccumulator();
        CollectionAccumulator kafkaPartitionAccumulator = spark.sparkContext().collectionAccumulator();
        if (arcContext.isStreaming()) {
            dataset2 = spark.readStream().format("kafka").option("kafka.bootstrap.servers", stage.bootstrapServers()).option("subscribe", stage.topic()).load();
        } else {
            Properties commonProps = new Properties();
            commonProps.put("bootstrap.servers", stage.bootstrapServers());
            commonProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            commonProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            commonProps.put("enable.auto.commit", "false");
            commonProps.put("auto.offset.reset", "earliest");
            commonProps.put("request.timeout.ms", ((Object)BoxesRunTime.boxToLong((long)stage.timeout())).toString());
            commonProps.put("session.timeout.ms", ((Object)BoxesRunTime.boxToLong((long)Math.min(10000L, stage.timeout() - 1L))).toString());
            commonProps.put("fetch.max.wait.ms", ((Object)BoxesRunTime.boxToLong((long)Math.min(500L, stage.timeout() - 1L))).toString());
            commonProps.put("heartbeat.interval.ms", ((Object)BoxesRunTime.boxToLong((long)Math.min(3000L, stage.timeout() - 2L))).toString());
            Properties props = new Properties();
            props.putAll((Map<?, ?>)commonProps);
            props.put("group.id", stage.groupID());
            Map endOffsets = KafkaExtractStage$.liftedTree1$1(props, stage);
            int n = stage.maxPollRecords();
            String stageGroupID = stage.groupID();
            String stageTopic = stage.topic();
            long stageTimeout = stage.timeout();
            boolean stageAutoCommit = stage.autoCommit();
            Dataset df = this.liftedTree2$1(spark, endOffsets, commonProps, stageGroupID, stageTopic, stageTimeout, recordAccumulator, bytesAccumulator, kafkaPartitionAccumulator, stageAutoCommit, stage);
            if (!stage.autoCommit()) {
                HashMap offsets = new HashMap();
                ((IterableLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(endOffsets).asScala()).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                    Tuple2 tuple2 = x0$1;
                    if (tuple2 == null) {
                        throw new MatchError((Object)tuple2);
                    }
                    TopicPartition topicPartition = (TopicPartition)tuple2._1();
                    Long offset = (Long)tuple2._2();
                    OffsetAndMetadata offsetAndMetadata = offsets.put(topicPartition, new OffsetAndMetadata(Predef$.MODULE$.Long2long(offset)));
                    return offsetAndMetadata;
                });
            }
            dataset2 = df;
        }
        Dataset df = dataset2;
        List<String> list = stage.partitionBy();
        if (Nil$.MODULE$.equals(list)) {
            Dataset dataset3;
            Option<Object> option = stage.numPartitions();
            if (option instanceof Some) {
                Some some = (Some)option;
                int numPartitions = BoxesRunTime.unboxToInt((Object)some.value());
                dataset3 = df.repartition(numPartitions);
            } else if (None$.MODULE$.equals(option)) {
                dataset3 = df;
            } else {
                throw new MatchError(option);
            }
            dataset = dataset3;
        } else {
            Dataset dataset4;
            List partitionCols = (List)list.map((Function1 & Serializable & scala.Serializable)col -> df.apply(col), List$.MODULE$.canBuildFrom());
            Option<Object> option = stage.numPartitions();
            if (option instanceof Some) {
                Some some = (Some)option;
                int numPartitions = BoxesRunTime.unboxToInt((Object)some.value());
                dataset4 = df.repartition(numPartitions, (Seq)partitionCols);
            } else if (None$.MODULE$.equals(option)) {
                dataset4 = df.repartition((Seq)partitionCols);
            } else {
                throw new MatchError(option);
            }
            dataset = dataset4;
        }
        Dataset repartitionedDF = dataset;
        if (arcContext.immutableViews()) {
            repartitionedDF.createTempView(stage.outputView());
        } else {
            repartitionedDF.createOrReplaceTempView(stage.outputView());
        }
        if (!repartitionedDF.isStreaming()) {
            stage.stageDetail().put((Object)"outputColumns", (Object)repartitionedDF.schema().length());
            boxedUnit = stage.stageDetail().put((Object)"numPartitions", (Object)repartitionedDF.rdd().partitions().length);
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
        if (!(!stage.persist() && stage.autoCommit() || repartitionedDF.isStreaming())) {
            repartitionedDF.persist(arcContext.storageLevel());
            long recordCount = repartitionedDF.count();
            HashMap<String, Long> inputMetricsMap = new HashMap<String, Long>();
            inputMetricsMap.put("recordsRead", Predef$.MODULE$.Long2long(recordAccumulator.value()));
            inputMetricsMap.put("bytesRead", Predef$.MODULE$.Long2long(bytesAccumulator.value()));
            stage.stageDetail().put((Object)"inputMetrics", inputMetricsMap);
            List kafkaPartitions = ((TraversableOnce)JavaConverters$.MODULE$.asScalaBufferConverter(kafkaPartitionAccumulator.value()).asScala()).toList();
            arcContext.userData().put((Object)"kafkaExtractOffsets", (Object)kafkaPartitions);
            HashMap partitions = new HashMap();
            kafkaPartitions.foreach((Function1 & Serializable & scala.Serializable)kafkaPartition -> {
                HashMap<String, Long> partitionOffsets = new HashMap<String, Long>();
                partitionOffsets.put("startOffset", BoxesRunTime.boxToLong((long)kafkaPartition.position()));
                partitionOffsets.put("endOffset", BoxesRunTime.boxToLong((long)kafkaPartition.endOffset()));
                return partitions.put(BoxesRunTime.boxToInteger((int)kafkaPartition.topicPartition().partition()), partitionOffsets);
            });
            stage.stageDetail().put((Object)"partitionsOffsets", partitions);
            long offsetsSum = BoxesRunTime.unboxToLong((Object)kafkaPartitions.foldLeft((Object)BoxesRunTime.boxToLong((long)0L), (Function2 & Serializable & scala.Serializable)(state, kafkaPartition) -> BoxesRunTime.boxToLong((long)KafkaExtractStage$.$anonfun$execute$8(BoxesRunTime.unboxToLong((Object)state), kafkaPartition))));
            stage.stageDetail().put((Object)"records", (Object)recordCount);
            if (offsetsSum != recordCount) {
                throw new DetailException(stage, offsetsSum, recordCount){
                    private final scala.collection.mutable.Map<String, Object> detail;

                    public scala.collection.mutable.Map<String, Object> detail() {
                        return this.detail;
                    }
                    {
                        this.detail = stage$1.stageDetail();
                    }
                };
            }
        }
        return Option$.MODULE$.apply((Object)repartitionedDF);
    }

    public KafkaExtractStage apply(KafkaExtract plugin, Option<String> id, String name, Option<String> description, String outputView, String topic, String bootstrapServers, String groupID, int maxPollRecords, long timeout, boolean autoCommit, scala.collection.immutable.Map<String, String> params, boolean persist, Option<Object> numPartitions, List<String> partitionBy) {
        return new KafkaExtractStage(plugin, id, name, description, outputView, topic, bootstrapServers, groupID, maxPollRecords, timeout, autoCommit, params, persist, numPartitions, partitionBy);
    }

    public Option<Tuple15<KafkaExtract, Option<String>, String, Option<String>, String, String, String, String, Object, Object, Object, scala.collection.immutable.Map<String, String>, Object, Option<Object>, List<String>>> unapply(KafkaExtractStage x$0) {
        return x$0 == null ? None$.MODULE$ : new Some((Object)new Tuple15((Object)x$0.plugin(), x$0.id(), (Object)x$0.name(), x$0.description(), (Object)x$0.outputView(), (Object)x$0.topic(), (Object)x$0.bootstrapServers(), (Object)x$0.groupID(), (Object)BoxesRunTime.boxToInteger((int)x$0.maxPollRecords()), (Object)BoxesRunTime.boxToLong((long)x$0.timeout()), (Object)BoxesRunTime.boxToBoolean((boolean)x$0.autoCommit()), x$0.params(), (Object)BoxesRunTime.boxToBoolean((boolean)x$0.persist()), x$0.numPartitions(), x$0.partitionBy()));
    }

    private Object readResolve() {
        return MODULE$;
    }

    private static final /* synthetic */ Map liftedTree1$1(Properties props$1, KafkaExtractStage stage$1) {
        Map map;
        try (KafkaConsumer kafkaDriverConsumer = new KafkaConsumer(props$1);){
            java.util.List partitionInfos = kafkaDriverConsumer.partitionsFor(stage$1.topic());
            java.util.List topicPartitions = (java.util.List)JavaConverters$.MODULE$.bufferAsJavaListConverter((Buffer)((TraversableLike)JavaConverters$.MODULE$.asScalaBufferConverter(partitionInfos).asScala()).map((Function1 & Serializable & scala.Serializable)partitionInfo -> new TopicPartition(stage$1.topic(), partitionInfo.partition()), Buffer$.MODULE$.canBuildFrom())).asJava();
            map = kafkaDriverConsumer.endOffsets((Collection)topicPartitions);
        }
        catch (Exception e) {
            throw new DetailException(e, stage$1){
                private final scala.collection.mutable.Map<String, Object> detail;

                public scala.collection.mutable.Map<String, Object> detail() {
                    return this.detail;
                }
                {
                    this.detail = stage$1.stageDetail();
                }
            };
        }
        return map;
    }

    public static final /* synthetic */ boolean $anonfun$execute$3(long endOffset$1, ConsumerRecord consumerRecord) {
        return consumerRecord.offset() <= endOffset$1;
    }

    private static final List getKafkaRecord$1(KafkaConsumer kafkaConsumer$1, long stageTimeout$1, String stageTopic$1, long endOffset$1, LongAccumulator recordAccumulator$1, LongAccumulator bytesAccumulator$1) {
        return ((TraversableOnce)((TraversableLike)((TraversableLike)JavaConverters$.MODULE$.iterableAsScalaIterableConverter(kafkaConsumer$1.poll(Duration.ofMillis(stageTimeout$1)).records(stageTopic$1)).asScala()).filter((Function1 & Serializable & scala.Serializable)consumerRecord -> BoxesRunTime.boxToBoolean((boolean)KafkaExtractStage$.$anonfun$execute$3(endOffset$1, consumerRecord)))).map((Function1 & Serializable & scala.Serializable)consumerRecord -> {
            recordAccumulator$1.add(1L);
            bytesAccumulator$1.add((long)((consumerRecord.key() != null ? ((byte[])consumerRecord.key()).length : 0) + (consumerRecord.value() != null ? ((byte[])consumerRecord.value()).length : 0)));
            return new KafkaExtractStage.KafkaRecord(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp(), (byte[])consumerRecord.key(), (byte[])consumerRecord.value());
        }, Iterable$.MODULE$.canBuildFrom())).toList();
    }

    private final List getAllKafkaRecords$1(List kafkaRecords, List kafkaRecordsAccumulator, KafkaConsumer kafkaConsumer$1, long stageTimeout$1, String stageTopic$1, long endOffset$1, LongAccumulator recordAccumulator$1, LongAccumulator bytesAccumulator$1) {
        while (true) {
            List list;
            if (Nil$.MODULE$.equals(list = kafkaRecords)) break;
            List list2 = kafkaRecordsAccumulator;
            kafkaRecordsAccumulator = kafkaRecords.$colon$colon$colon(list2);
            kafkaRecords = KafkaExtractStage$.getKafkaRecord$1(kafkaConsumer$1, stageTimeout$1, stageTopic$1, endOffset$1, recordAccumulator$1, bytesAccumulator$1);
        }
        List list = kafkaRecordsAccumulator;
        return list;
    }

    private final /* synthetic */ Dataset liftedTree2$1(SparkSession spark$1, Map endOffsets$1, Properties commonProps$1, String stageGroupID$1, String stageTopic$1, long stageTimeout$1, LongAccumulator recordAccumulator$1, LongAccumulator bytesAccumulator$1, CollectionAccumulator kafkaPartitionAccumulator$1, boolean stageAutoCommit$1, KafkaExtractStage stage$1) {
        Dataset dataset;
        try {
            RDD qual$1 = spark$1.sparkContext().parallelize((Seq)Nil$.MODULE$, spark$1.sparkContext().parallelize$default$2(), ClassTag$.MODULE$.apply(String.class)).repartition(endOffsets$1.size(), (Ordering)Ordering.String$.MODULE$);
            Function1 & Serializable & scala.Serializable x$1 = (Function1 & Serializable & scala.Serializable)partition -> {
                Iterator iterator;
                int partitionId = TaskContext$.MODULE$.getPartitionId();
                Properties props = new Properties();
                props.putAll((Map<?, ?>)commonProps$1);
                props.put("group.id", new StringBuilder(1).append(stageGroupID$1).append("-").append(partitionId).toString());
                KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
                TopicPartition topicPartition = new TopicPartition(stageTopic$1, partitionId);
                long endOffset = (Long)endOffsets$1.get(topicPartition);
                try {
                    kafkaConsumer.assign((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)topicPartition, (List)Nil$.MODULE$)).asJava());
                    kafkaPartitionAccumulator$1.add((Object)new KafkaPartition(topicPartition, kafkaConsumer.position(topicPartition), endOffset));
                    List dataset = this.getAllKafkaRecords$1(KafkaExtractStage$.getKafkaRecord$1(kafkaConsumer, stageTimeout$1, stageTopic$1, endOffset, recordAccumulator$1, bytesAccumulator$1), (List)Nil$.MODULE$, kafkaConsumer, stageTimeout$1, stageTopic$1, endOffset, recordAccumulator$1, bytesAccumulator$1);
                    if (stageAutoCommit$1) {
                        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
                        offsets.put(topicPartition, new OffsetAndMetadata(endOffset));
                        kafkaConsumer.commitSync(offsets);
                    }
                    iterator = dataset.toIterator();
                }
                finally {
                    kafkaConsumer.close();
                }
                return iterator;
            };
            boolean x$2 = qual$1.mapPartitions$default$2();
            JavaUniverse $u = package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
            public final class Ai_tripl_arc_extract_KafkaExtractStage$$typecreator5$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $m.staticClass("ai.tripl.arc.extract.KafkaExtractStage.KafkaRecord").asType().toTypeConstructor();
                }

                public Ai_tripl_arc_extract_KafkaExtractStage$$typecreator5$1() {
                }
            }
            dataset = spark$1.implicits().rddToDatasetHolder(qual$1.mapPartitions((Function1)x$1, x$2, ClassTag$.MODULE$.apply(KafkaExtractStage.KafkaRecord.class)), spark$1.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Ai_tripl_arc_extract_KafkaExtractStage$$typecreator5$1()))).toDF();
        }
        catch (Exception e) {
            throw new DetailException(e, stage$1){
                private final scala.collection.mutable.Map<String, Object> detail;

                public scala.collection.mutable.Map<String, Object> detail() {
                    return this.detail;
                }
                {
                    this.detail = stage$1.stageDetail();
                }
            };
        }
        return dataset;
    }

    public static final /* synthetic */ long $anonfun$execute$8(long state, KafkaPartition kafkaPartition) {
        return state + (kafkaPartition.endOffset() - kafkaPartition.position());
    }

    private KafkaExtractStage$() {
        MODULE$ = this;
    }
}

