/*
 * Decompiled with CFR 0.152.
 */
package apoc.kafka.service.errors;

import apoc.kafka.service.errors.ErrorData;
import apoc.kafka.service.errors.ErrorService;
import apoc.kafka.service.errors.ProcessingError;
import apoc.kafka.utils.KafkaUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.TuplesKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.collections.MapsKt;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import kotlin.text.Charsets;
import kotlin.text.StringsKt;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.neo4j.util.VisibleForTesting;

@Metadata(mv={1, 9, 0}, k=1, xi=48, d1={"\u0000P\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0012\n\u0002\b\u0003\n\u0002\u0010$\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010 \n\u0002\b\u0002\u0018\u0000 \u00192\u00020\u0001:\u0001\u0019B9\b\u0016\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012 \u0010\u0006\u001a\u001c\u0012\u0004\u0012\u00020\b\u0012\f\u0012\n\u0018\u00010\tj\u0004\u0018\u0001`\n\u0012\u0004\u0012\u00020\u000b0\u0007\u00a2\u0006\u0002\u0010\fBE\u0012\u0014\u0010\r\u001a\u0010\u0012\u0004\u0012\u00020\u000f\u0012\u0004\u0012\u00020\u000f\u0018\u00010\u000e\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012 \u0010\u0006\u001a\u001c\u0012\u0004\u0012\u00020\b\u0012\f\u0012\n\u0018\u00010\tj\u0004\u0018\u0001`\n\u0012\u0004\u0012\u00020\u000b0\u0007\u00a2\u0006\u0002\u0010\u0010J\b\u0010\u0011\u001a\u00020\u000bH\u0016J\u001c\u0010\u0012\u001a\u000e\u0012\u0004\u0012\u00020\b\u0012\u0004\u0012\u00020\u000f0\u00132\u0006\u0010\u0014\u001a\u00020\u0015H\u0007J\u0016\u0010\u0016\u001a\u00020\u000b2\f\u0010\u0017\u001a\b\u0012\u0004\u0012\u00020\u00150\u0018H\u0016R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004\u00a2\u0006\u0002\n\u0000R(\u0010\u0006\u001a\u001c\u0012\u0004\u0012\u00020\b\u0012\f\u0012\n\u0018\u00010\tj\u0004\u0018\u0001`\n\u0012\u0004\u0012\u00020\u000b0\u0007X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u001c\u0010\r\u001a\u0010\u0012\u0004\u0012\u00020\u000f\u0012\u0004\u0012\u00020\u000f\u0018\u00010\u000eX\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u001a"}, d2={"Lapoc/kafka/service/errors/KafkaErrorService;", "Lapoc/kafka/service/errors/ErrorService;", "config", "Ljava/util/Properties;", "errorConfig", "Lapoc/kafka/service/errors/ErrorService$ErrorConfig;", "log", "Lkotlin/Function2;", "", "Ljava/lang/Exception;", "Lkotlin/Exception;", "", "(Ljava/util/Properties;Lapoc/kafka/service/errors/ErrorService$ErrorConfig;Lkotlin/jvm/functions/Function2;)V", "producer", "Lorg/apache/kafka/clients/producer/Producer;", "", "(Lorg/apache/kafka/clients/producer/Producer;Lapoc/kafka/service/errors/ErrorService$ErrorConfig;Lkotlin/jvm/functions/Function2;)V", "close", "populateContextHeaders", "", "errorData", "Lapoc/kafka/service/errors/ErrorData;", "report", "errorDatas", "", "Companion", "apoc"})
@SourceDebugExtension(value={"SMAP\nKafkaErrorService.kt\nKotlin\n*S Kotlin\n*F\n+ 1 KafkaErrorService.kt\napoc/kafka/service/errors/KafkaErrorService\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n+ 3 _Maps.kt\nkotlin/collections/MapsKt___MapsKt\n*L\n1#1,97:1\n1855#2,2:98\n1549#2:100\n1620#2,3:101\n1855#2,2:104\n1855#2:106\n1856#2:109\n215#3,2:107\n*S KotlinDebug\n*F\n+ 1 KafkaErrorService.kt\napoc/kafka/service/errors/KafkaErrorService\n*L\n40#1:98,2\n42#1:100\n42#1:101,3\n42#1:104,2\n46#1:106\n46#1:109\n56#1:107,2\n*E\n"})
public final class KafkaErrorService
extends ErrorService {
    @NotNull
    public static final Companion Companion = new Companion(null);
    @Nullable
    private final Producer<byte[], byte[]> producer;
    @NotNull
    private final ErrorService.ErrorConfig errorConfig;
    @NotNull
    private final Function2<String, Exception, Unit> log;

    public KafkaErrorService(@Nullable Producer<byte[], byte[]> producer, @NotNull ErrorService.ErrorConfig errorConfig, @NotNull Function2<? super String, ? super Exception, Unit> log) {
        Intrinsics.checkNotNullParameter((Object)errorConfig, (String)"errorConfig");
        Intrinsics.checkNotNullParameter(log, (String)"log");
        super(null, 1, null);
        this.producer = producer;
        this.errorConfig = errorConfig;
        this.log = log;
    }

    public KafkaErrorService(@NotNull Properties config, @NotNull ErrorService.ErrorConfig errorConfig, @NotNull Function2<? super String, ? super Exception, Unit> log) {
        Intrinsics.checkNotNullParameter((Object)config, (String)"config");
        Intrinsics.checkNotNullParameter((Object)errorConfig, (String)"errorConfig");
        Intrinsics.checkNotNullParameter(log, (String)"log");
        this((Producer<byte[], byte[]>)((Producer)KafkaErrorService.Companion.producer(errorConfig, config, (Function2<? super String, ? super Exception, Unit>)log)), errorConfig, log);
    }

    /*
     * WARNING - void declaration
     */
    @Override
    public void report(@NotNull List<ErrorData> errorDatas) {
        boolean $i$f$forEach;
        Iterable $this$forEach$iv;
        Intrinsics.checkNotNullParameter(errorDatas, (String)"errorDatas");
        if (this.errorConfig.getFail()) {
            throw new ProcessingError(errorDatas);
        }
        if (this.errorConfig.getLog()) {
            if (this.errorConfig.getLogMessages()) {
                $this$forEach$iv = errorDatas;
                $i$f$forEach = false;
                for (Object element$iv : $this$forEach$iv) {
                    ErrorData it = (ErrorData)element$iv;
                    boolean bl = false;
                    this.log.invoke((Object)it.toLogString(), (Object)it.getException());
                }
            } else {
                void $this$mapTo$iv$iv;
                Iterable $this$map$iv = errorDatas;
                boolean $i$f$map = false;
                Iterator iterator = $this$map$iv;
                Collection destination$iv$iv = new ArrayList(CollectionsKt.collectionSizeOrDefault((Iterable)$this$map$iv, (int)10));
                boolean $i$f$mapTo = false;
                for (Object item$iv$iv : $this$mapTo$iv$iv) {
                    void it;
                    ErrorData errorData = (ErrorData)item$iv$iv;
                    Collection collection = destination$iv$iv;
                    boolean bl = false;
                    collection.add(it.getException());
                }
                $this$forEach$iv = CollectionsKt.distinct((Iterable)((List)destination$iv$iv));
                $i$f$forEach = false;
                for (Object element$iv : $this$forEach$iv) {
                    Exception it = (Exception)element$iv;
                    boolean bl = false;
                    this.log.invoke((Object)("Error processing " + errorDatas.size() + " messages"), (Object)it);
                }
            }
        }
        $this$forEach$iv = errorDatas;
        $i$f$forEach = false;
        for (Object element$iv : $this$forEach$iv) {
            ErrorData dlqData = (ErrorData)element$iv;
            boolean bl = false;
            if (this.producer == null) continue;
            boolean bl2 = false;
            try {
                Producer<byte[], byte[]> it;
                ProducerRecord producerRecord;
                ProducerRecord producerRecord2 = producerRecord = dlqData.getTimestamp() == -1L ? new ProducerRecord(this.errorConfig.getDlqTopic(), null, (Object)dlqData.getKey(), (Object)dlqData.getValue()) : new ProducerRecord(this.errorConfig.getDlqTopic(), null, Long.valueOf(dlqData.getTimestamp()), (Object)dlqData.getKey(), (Object)dlqData.getValue());
                if (this.errorConfig.getDlqHeaders()) {
                    Headers producerHeader = producerRecord.headers();
                    Map<String, byte[]> $this$forEach$iv2 = this.populateContextHeaders(dlqData);
                    boolean $i$f$forEach2 = false;
                    Iterator<Map.Entry<String, byte[]>> iterator = $this$forEach$iv2.entrySet().iterator();
                    while (iterator.hasNext()) {
                        Map.Entry<String, byte[]> element$iv2;
                        Map.Entry<String, byte[]> entry = element$iv2 = iterator.next();
                        boolean bl3 = false;
                        String key = entry.getKey();
                        byte[] value = entry.getValue();
                        producerHeader.add(key, value);
                    }
                }
                it.send(producerRecord);
            }
            catch (Exception e) {
                this.log.invoke((Object)("Error writing to DLQ " + e + ": " + dlqData.toLogString()), (Object)e);
            }
        }
    }

    @VisibleForTesting
    @NotNull
    public final Map<String, byte[]> populateContextHeaders(@NotNull ErrorData errorData) {
        Object object;
        Intrinsics.checkNotNullParameter((Object)errorData, (String)"errorData");
        Object object2 = new Pair[3];
        String string = KafkaErrorService.populateContextHeaders$prefix(this, "topic");
        String string2 = errorData.getOriginalTopic();
        byte[] byArray = string2.getBytes(Charsets.UTF_8);
        Intrinsics.checkNotNullExpressionValue((Object)byArray, (String)"getBytes(...)");
        object2[0] = TuplesKt.to((Object)string, (Object)byArray);
        String string3 = KafkaErrorService.populateContextHeaders$prefix(this, "partition");
        string2 = errorData.getPartition();
        byte[] byArray2 = string2.getBytes(Charsets.UTF_8);
        Intrinsics.checkNotNullExpressionValue((Object)byArray2, (String)"getBytes(...)");
        object2[1] = TuplesKt.to((Object)string3, (Object)byArray2);
        String string4 = KafkaErrorService.populateContextHeaders$prefix(this, "offset");
        string2 = errorData.getOffset();
        byte[] byArray3 = string2.getBytes(Charsets.UTF_8);
        Intrinsics.checkNotNullExpressionValue((Object)byArray3, (String)"getBytes(...)");
        object2[2] = TuplesKt.to((Object)string4, (Object)byArray3);
        Map headers = MapsKt.mutableMapOf((Pair[])object2);
        object2 = errorData.getDatabaseName();
        if (!(object2 == null || StringsKt.isBlank((CharSequence)object2))) {
            object2 = headers;
            string2 = KafkaErrorService.populateContextHeaders$prefix(this, "databaseName");
            object = errorData.getDatabaseName();
            byte[] byArray4 = ((String)object).getBytes(Charsets.UTF_8);
            Intrinsics.checkNotNullExpressionValue((Object)byArray4, (String)"getBytes(...)");
            object = byArray4;
            object2.put(string2, object);
        }
        if (errorData.getExecutingClass() != null) {
            object2 = headers;
            string2 = KafkaErrorService.populateContextHeaders$prefix(this, "class.name");
            String string5 = errorData.getExecutingClass().getName();
            Intrinsics.checkNotNullExpressionValue((Object)string5, (String)"getName(...)");
            object = string5;
            byte[] byArray5 = ((String)object).getBytes(Charsets.UTF_8);
            Intrinsics.checkNotNullExpressionValue((Object)byArray5, (String)"getBytes(...)");
            object = byArray5;
            object2.put(string2, object);
        }
        if (errorData.getException() != null) {
            object2 = headers;
            string2 = KafkaErrorService.populateContextHeaders$prefix(this, "exception.class.name");
            String string6 = errorData.getException().getClass().getName();
            Intrinsics.checkNotNullExpressionValue((Object)string6, (String)"getName(...)");
            object = string6;
            byte[] byArray6 = ((String)object).getBytes(Charsets.UTF_8);
            Intrinsics.checkNotNullExpressionValue((Object)byArray6, (String)"getBytes(...)");
            object = byArray6;
            object2.put(string2, object);
            if (errorData.getException().getMessage() != null) {
                object2 = headers;
                string2 = KafkaErrorService.populateContextHeaders$prefix(this, "exception.message");
                object = String.valueOf(errorData.getException().getMessage());
                byte[] byArray7 = ((String)object).getBytes(Charsets.UTF_8);
                Intrinsics.checkNotNullExpressionValue((Object)byArray7, (String)"getBytes(...)");
                object = byArray7;
                object2.put(string2, object);
            }
            object2 = headers;
            string2 = KafkaErrorService.populateContextHeaders$prefix(this, "exception.stacktrace");
            String string7 = ExceptionUtils.getStackTrace((Throwable)errorData.getException());
            Intrinsics.checkNotNullExpressionValue((Object)string7, (String)"getStackTrace(...)");
            object = string7;
            byte[] byArray8 = ((String)object).getBytes(Charsets.UTF_8);
            Intrinsics.checkNotNullExpressionValue((Object)byArray8, (String)"getBytes(...)");
            object = byArray8;
            object2.put(string2, object);
        }
        return headers;
    }

    @Override
    public void close() {
        block0: {
            Producer<byte[], byte[]> producer = this.producer;
            if (producer == null) break block0;
            producer.close();
        }
    }

    private static final String populateContextHeaders$prefix(KafkaErrorService this$0, String suffix) {
        return this$0.errorConfig.getDlqHeaderPrefix() + suffix;
    }

    @Metadata(mv={1, 9, 0}, k=1, xi=48, d1={"\u00008\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0012\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n\u0000\b\u0086\u0003\u0018\u00002\u00020\u0001B\u0007\b\u0002\u00a2\u0006\u0002\u0010\u0002JH\u0010\u0003\u001a\u0010\u0012\u0004\u0012\u00020\u0005\u0012\u0004\u0012\u00020\u0005\u0018\u00010\u00042\u0006\u0010\u0006\u001a\u00020\u00072\u0006\u0010\b\u001a\u00020\t2 \u0010\n\u001a\u001c\u0012\u0004\u0012\u00020\f\u0012\f\u0012\n\u0018\u00010\rj\u0004\u0018\u0001`\u000e\u0012\u0004\u0012\u00020\u000f0\u000bH\u0002\u00a8\u0006\u0010"}, d2={"Lapoc/kafka/service/errors/KafkaErrorService$Companion;", "", "()V", "producer", "Lorg/apache/kafka/clients/producer/KafkaProducer;", "", "errorConfig", "Lapoc/kafka/service/errors/ErrorService$ErrorConfig;", "config", "Ljava/util/Properties;", "log", "Lkotlin/Function2;", "", "Ljava/lang/Exception;", "Lkotlin/Exception;", "", "apoc"})
    public static final class Companion {
        private Companion() {
        }

        private final KafkaProducer<byte[], byte[]> producer(ErrorService.ErrorConfig errorConfig, Properties config, Function2<? super String, ? super Exception, Unit> log) {
            KafkaProducer kafkaProducer;
            String string = errorConfig.getDlqTopic();
            if (string != null) {
                KafkaProducer kafkaProducer2;
                String it = string;
                boolean bl = false;
                try {
                    String bootstrapServers = config.getOrDefault((Object)"bootstrap.servers", "").toString();
                    KafkaUtil.INSTANCE.validateConnection(bootstrapServers, "bootstrap.servers", false);
                    ((Map)config).put("key.serializer", ByteArraySerializer.class.getName());
                    ((Map)config).put("value.serializer", ByteArraySerializer.class.getName());
                    kafkaProducer2 = new KafkaProducer(config);
                }
                catch (Exception e) {
                    log.invoke((Object)"Cannot initialize the custom DLQ because of the following exception: ", (Object)e);
                    kafkaProducer2 = null;
                }
                kafkaProducer = kafkaProducer2;
            } else {
                kafkaProducer = null;
            }
            return kafkaProducer;
        }

        public /* synthetic */ Companion(DefaultConstructorMarker $constructor_marker) {
            this();
        }
    }
}

