/*
 * Decompiled with CFR 0.152.
 */
package ai.tripl.arc.execute;

import ai.tripl.arc.api.API;
import ai.tripl.arc.execute.KafkaCommitExecute;
import ai.tripl.arc.execute.KafkaCommitExecuteStage;
import ai.tripl.arc.util.DetailException;
import ai.tripl.arc.util.log.logger.Logger;
import java.util.HashMap;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple7;
import scala.collection.Seq;
import scala.collection.mutable.Map;
import scala.runtime.BoxesRunTime;

public final class KafkaCommitExecuteStage$
implements Serializable {
    public static final KafkaCommitExecuteStage$ MODULE$;

    static {
        new KafkaCommitExecuteStage$();
    }

    public Option<Dataset<Row>> execute(KafkaCommitExecuteStage stage, SparkSession spark, Logger logger, API.ARCContext arcContext) {
        Dataset df = spark.table(stage.inputView());
        HashMap offsetsLogMap = new HashMap();
        try {
            Dataset offset = df.groupBy((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{df.apply("topic"), df.apply("partition")})).agg(functions$.MODULE$.max(df.apply("offset")), (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[0])).orderBy((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{df.apply("topic"), df.apply("partition")})).limit(10000);
            Properties props = new Properties();
            props.put("bootstrap.servers", stage.bootstrapServers());
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("enable.auto.commit", "false");
            Predef$.MODULE$.refArrayOps((Object[])offset.collect()).foreach((Function1)new Serializable(stage, offsetsLogMap, props){
                public static final long serialVersionUID = 0L;
                private final KafkaCommitExecuteStage stage$1;
                private final HashMap offsetsLogMap$1;
                private final Properties props$1;

                /*
                 * WARNING - void declaration
                 */
                public final Option<Object> apply(Row row) {
                    Option option;
                    String topic = row.getString(0);
                    int partitionId = row.getInt(1);
                    long offset = row.getLong(2) + 1L;
                    this.props$1.put("group.id", new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "-", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.stage$1.groupID(), BoxesRunTime.boxToInteger((int)partitionId)})));
                    KafkaConsumer kafkaConsumer = new KafkaConsumer(this.props$1);
                    try {
                        HashMap<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<TopicPartition, OffsetAndMetadata>();
                        offsetsMap.put(new TopicPartition(topic, partitionId), new OffsetAndMetadata(offset));
                        kafkaConsumer.commitSync(offsetsMap);
                        this.offsetsLogMap$1.put(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "-", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.stage$1.groupID(), BoxesRunTime.boxToInteger((int)partitionId)})), offset);
                        option = this.stage$1.stageDetail().put((Object)"offsets", (Object)this.offsetsLogMap$1);
                    }
                    catch (Throwable throwable) {
                        void var6_5;
                        var6_5.close();
                        throw throwable;
                    }
                    kafkaConsumer.close();
                    return option;
                }
                {
                    this.stage$1 = stage$1;
                    this.offsetsLogMap$1 = offsetsLogMap$1;
                    this.props$1 = props$1;
                }
            });
            return None$.MODULE$;
        }
        catch (Exception exception) {
            throw new DetailException(stage, exception){
                private final Map<String, Object> detail;

                public Map<String, Object> detail() {
                    return this.detail;
                }
                {
                    this.detail = stage$1.stageDetail();
                }
            };
        }
    }

    public KafkaCommitExecuteStage apply(KafkaCommitExecute plugin, String name, Option<String> description, String inputView, String bootstrapServers, String groupID, scala.collection.immutable.Map<String, String> params) {
        return new KafkaCommitExecuteStage(plugin, name, description, inputView, bootstrapServers, groupID, params);
    }

    public Option<Tuple7<KafkaCommitExecute, String, Option<String>, String, String, String, scala.collection.immutable.Map<String, String>>> unapply(KafkaCommitExecuteStage x$0) {
        return x$0 == null ? None$.MODULE$ : new Some((Object)new Tuple7((Object)x$0.plugin(), (Object)x$0.name(), x$0.description(), (Object)x$0.inputView(), (Object)x$0.bootstrapServers(), (Object)x$0.groupID(), x$0.params()));
    }

    private Object readResolve() {
        return MODULE$;
    }

    private KafkaCommitExecuteStage$() {
        MODULE$ = this;
    }
}

