/*
 * Decompiled with CFR 0.152.
 */
package apoc.kafka.consumer.kafka;

import apoc.kafka.consumer.kafka.KafkaAutoCommitEventConsumer;
import apoc.kafka.consumer.kafka.KafkaSinkConfiguration;
import apoc.kafka.consumer.kafka.KafkaTopicConfig;
import apoc.kafka.extensions.CommonExtensionsKt;
import apoc.kafka.service.StreamsSinkEntity;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import kotlin.Metadata;
import kotlin.TuplesKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.collections.MapsKt;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import kotlin.text.StringsKt;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.jetbrains.annotations.NotNull;
import org.neo4j.logging.Log;

@Metadata(mv={1, 9, 0}, k=1, xi=48, d1={"\u0000V\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\"\n\u0002\u0010\u000e\n\u0002\b\u0003\n\u0002\u0010\u000b\n\u0000\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0010$\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u0000\n\u0002\b\u0004\u0018\u00002\u00020\u0001B+\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\f\u0010\u0006\u001a\b\u0012\u0004\u0012\u00020\b0\u0007\u0012\u0006\u0010\t\u001a\u00020\b\u00a2\u0006\u0002\u0010\nJ$\u0010\r\u001a\u00020\u000e2\u0006\u0010\u000f\u001a\u00020\f2\u0012\u0010\u0010\u001a\u000e\u0012\u0004\u0012\u00020\u0012\u0012\u0004\u0012\u00020\u00130\u0011H\u0002J\b\u0010\u0014\u001a\u00020\u000eH\u0002J(\u0010\u0015\u001a\u00020\u000e2\u001e\u0010\u0016\u001a\u001a\u0012\u0004\u0012\u00020\b\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00190\u0018\u0012\u0004\u0012\u00020\u000e0\u0017H\u0016J<\u0010\u0015\u001a\u00020\u000e2\u0012\u0010\u001a\u001a\u000e\u0012\u0004\u0012\u00020\b\u0012\u0004\u0012\u00020\u001b0\u00112\u001e\u0010\u0016\u001a\u001a\u0012\u0004\u0012\u00020\b\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00190\u0018\u0012\u0004\u0012\u00020\u000e0\u0017H\u0016J4\u0010\u001c\u001a\u000e\u0012\u0004\u0012\u00020\u0012\u0012\u0004\u0012\u00020\u00130\u00112\u001e\u0010\u0016\u001a\u001a\u0012\u0004\u0012\u00020\b\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00190\u0018\u0012\u0004\u0012\u00020\u000e0\u0017H\u0002J\b\u0010\u001d\u001a\u00020\u000eH\u0016J\b\u0010\u001e\u001a\u00020\u000eH\u0016R\u000e\u0010\u000b\u001a\u00020\fX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u001f"}, d2={"Lapoc/kafka/consumer/kafka/KafkaManualCommitEventConsumer;", "Lapoc/kafka/consumer/kafka/KafkaAutoCommitEventConsumer;", "config", "Lapoc/kafka/consumer/kafka/KafkaSinkConfiguration;", "log", "Lorg/neo4j/logging/Log;", "topics", "", "", "dbName", "(Lapoc/kafka/consumer/kafka/KafkaSinkConfiguration;Lorg/neo4j/logging/Log;Ljava/util/Set;Ljava/lang/String;)V", "asyncCommit", "", "commitData", "", "commit", "topicMap", "", "Lorg/apache/kafka/common/TopicPartition;", "Lorg/apache/kafka/clients/consumer/OffsetAndMetadata;", "doCommitSync", "read", "action", "Lkotlin/Function2;", "", "Lapoc/kafka/service/StreamsSinkEntity;", "topicConfig", "", "readSimple", "start", "stop", "apoc"})
@SourceDebugExtension(value={"SMAP\nKafkaManualCommitEventConsumer.kt\nKotlin\n*S Kotlin\n*F\n+ 1 KafkaManualCommitEventConsumer.kt\napoc/kafka/consumer/kafka/KafkaManualCommitEventConsumer\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n*L\n1#1,119:1\n1549#2:120\n1620#2,3:121\n*S KotlinDebug\n*F\n+ 1 KafkaManualCommitEventConsumer.kt\napoc/kafka/consumer/kafka/KafkaManualCommitEventConsumer\n*L\n109#1:120\n109#1:121,3\n*E\n"})
public final class KafkaManualCommitEventConsumer
extends KafkaAutoCommitEventConsumer {
    @NotNull
    private final Log log;
    private final boolean asyncCommit;

    public KafkaManualCommitEventConsumer(@NotNull KafkaSinkConfiguration config, @NotNull Log log, @NotNull Set<String> topics, @NotNull String dbName) {
        Intrinsics.checkNotNullParameter((Object)config, (String)"config");
        Intrinsics.checkNotNullParameter((Object)log, (String)"log");
        Intrinsics.checkNotNullParameter(topics, (String)"topics");
        Intrinsics.checkNotNullParameter((Object)dbName, (String)"dbName");
        super(config, log, topics, dbName);
        this.log = log;
        this.asyncCommit = config.getAsyncCommit();
    }

    @Override
    public void stop() {
        if (this.asyncCommit) {
            this.doCommitSync();
        }
        super.stop();
    }

    private final void doCommitSync() {
        try {
            this.getConsumer().commitSync();
        }
        catch (WakeupException e) {
            this.doCommitSync();
            throw e;
        }
        catch (CommitFailedException e) {
            this.log.warn("Commit failed", (Throwable)e);
        }
    }

    @Override
    public void start() {
        if (this.asyncCommit) {
            if (this.getTopics().isEmpty()) {
                this.log.info("No topics specified Kafka Consumer will not started");
                return;
            }
            this.getConsumer().subscribe((Collection)this.getTopics(), new ConsumerRebalanceListener(this){
                final /* synthetic */ KafkaManualCommitEventConsumer this$0;
                {
                    this.this$0 = $receiver;
                }

                public void onPartitionsRevoked(@NotNull Collection<TopicPartition> partitions) {
                    Intrinsics.checkNotNullParameter(partitions, (String)"partitions");
                    KafkaManualCommitEventConsumer.access$doCommitSync(this.this$0);
                }

                public void onPartitionsAssigned(@NotNull Collection<TopicPartition> partitions) {
                    Intrinsics.checkNotNullParameter(partitions, (String)"partitions");
                }
            });
        } else {
            super.start();
        }
    }

    private final void commitData(boolean commit, Map<TopicPartition, ? extends OffsetAndMetadata> topicMap) {
        if (commit && !topicMap.isEmpty()) {
            if (this.asyncCommit) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Committing data in async");
                }
                this.getConsumer().commitAsync(topicMap, (arg_0, arg_1) -> KafkaManualCommitEventConsumer.commitData$lambda$0(this, arg_0, arg_1));
            } else {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Committing data in sync");
                }
                this.getConsumer().commitSync(topicMap);
            }
        }
    }

    @Override
    public void read(@NotNull Function2<? super String, ? super List<StreamsSinkEntity>, Unit> action) {
        Intrinsics.checkNotNullParameter(action, (String)"action");
        Map<TopicPartition, OffsetAndMetadata> topicMap = this.readSimple(action);
        this.commitData(true, topicMap);
    }

    @Override
    public void read(@NotNull Map<String, ? extends Object> topicConfig, @NotNull Function2<? super String, ? super List<StreamsSinkEntity>, Unit> action) {
        Intrinsics.checkNotNullParameter(topicConfig, (String)"topicConfig");
        Intrinsics.checkNotNullParameter(action, (String)"action");
        KafkaTopicConfig kafkaTopicConfig = KafkaTopicConfig.Companion.fromMap(topicConfig);
        Map<TopicPartition, OffsetAndMetadata> topicMap = kafkaTopicConfig.getTopicPartitionsMap().isEmpty() ? this.readSimple(action) : this.readFromPartition(kafkaTopicConfig, action);
        this.commitData(kafkaTopicConfig.getCommit(), topicMap);
    }

    /*
     * WARNING - void declaration
     */
    private final Map<TopicPartition, OffsetAndMetadata> readSimple(Function2<? super String, ? super List<StreamsSinkEntity>, Unit> action) {
        Map map;
        ConsumerRecords records = this.getConsumer().poll(Duration.ZERO);
        if (records.isEmpty()) {
            map = MapsKt.emptyMap();
        } else {
            void $this$mapTo$iv$iv;
            Set set = records.partitions();
            Intrinsics.checkNotNullExpressionValue((Object)set, (String)"partitions(...)");
            Iterable $this$map$iv = set;
            boolean $i$f$map = false;
            Iterable iterable = $this$map$iv;
            Collection destination$iv$iv = new ArrayList(CollectionsKt.collectionSizeOrDefault((Iterable)$this$map$iv, (int)10));
            boolean $i$f$mapTo = false;
            for (Object item$iv$iv : $this$mapTo$iv$iv) {
                void topicPartition;
                TopicPartition topicPartition2 = (TopicPartition)item$iv$iv;
                Collection collection = destination$iv$iv;
                boolean bl = false;
                List topicRecords = records.records((TopicPartition)topicPartition);
                String string = topicPartition.topic();
                Intrinsics.checkNotNullExpressionValue((Object)string, (String)"topic(...)");
                Intrinsics.checkNotNull((Object)topicRecords);
                this.executeAction(action, string, topicRecords);
                ConsumerRecord last = (ConsumerRecord)CollectionsKt.last((List)topicRecords);
                Intrinsics.checkNotNull((Object)last);
                collection.add(TuplesKt.to((Object)CommonExtensionsKt.topicPartition(last), (Object)CommonExtensionsKt.offsetAndMetadata$default(last, null, 1, null)));
            }
            map = MapsKt.toMap((Iterable)((List)destination$iv$iv));
        }
        return map;
    }

    private static final void commitData$lambda$0(KafkaManualCommitEventConsumer this$0, Map offsets, Exception exception) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        Intrinsics.checkNotNullParameter((Object)offsets, (String)"offsets");
        if (exception != null) {
            this$0.log.warn(StringsKt.trimMargin$default((String)("\n                            |These offsets `" + offsets + "`\n                            |cannot be committed because of the following exception:\n                        "), null, (int)1, null), (Throwable)exception);
        }
    }

    public static final /* synthetic */ void access$doCommitSync(KafkaManualCommitEventConsumer $this) {
        $this.doCommitSync();
    }
}

