/*
 * Decompiled with CFR 0.152.
 */
package apoc.kafka.producer.kafka;

import apoc.kafka.events.KafkaStatus;
import apoc.kafka.events.StreamsEvent;
import apoc.kafka.events.StreamsTransactionEvent;
import apoc.kafka.extensions.GraphDatabaseServerExtensionsKt;
import apoc.kafka.producer.ExtensionsKt;
import apoc.kafka.producer.StreamsEventRouterConfiguration;
import apoc.kafka.producer.kafka.KafkaAdminService;
import apoc.kafka.producer.kafka.KafkaConfiguration;
import apoc.kafka.producer.kafka.Neo4jKafkaProducer;
import apoc.kafka.utils.JSONUtils;
import apoc.kafka.utils.KafkaUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import kotlin.Lazy;
import kotlin.LazyKt;
import kotlin.Metadata;
import kotlin.ResultKt;
import kotlin.UninitializedPropertyAccessException;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.sync.Mutex;
import kotlinx.coroutines.sync.MutexKt;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.logging.Log;

@Metadata(mv={1, 9, 0}, k=1, xi=48, d1={"\u0000z\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0010$\n\u0002\u0010\u000e\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0010\u0012\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u000b\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010 \n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0002\u0018\u00002\u00020\u0001B)\u0012\u0012\u0010\u0002\u001a\u000e\u0012\u0004\u0012\u00020\u0004\u0012\u0004\u0012\u00020\u00040\u0003\u0012\u0006\u0010\u0005\u001a\u00020\u0006\u0012\u0006\u0010\u0007\u001a\u00020\b\u00a2\u0006\u0002\u0010\tJ8\u0010\u001e\u001a\u0010\u0012\u0004\u0012\u00020\u0004\u0012\u0004\u0012\u00020\u0001\u0018\u00010\u00032\u0016\u0010\u001f\u001a\u0012\u0012\u0006\u0012\u0004\u0018\u00010\u001d\u0012\u0006\u0012\u0004\u0018\u00010\u001d0 2\b\b\u0002\u0010!\u001a\u00020\"H\u0002JF\u0010#\u001a\u0010\u0012\u0004\u0012\u00020\u0004\u0012\u0004\u0012\u00020\u0001\u0018\u00010\u00032\u0006\u0010$\u001a\u00020\u00042\u0006\u0010%\u001a\u00020&2\u0014\u0010\u0002\u001a\u0010\u0012\u0004\u0012\u00020\u0004\u0012\u0006\u0012\u0004\u0018\u00010\u00010\u00032\b\b\u0002\u0010!\u001a\u00020\"H\u0002J.\u0010#\u001a\u00020'2\u0006\u0010$\u001a\u00020\u00042\u0006\u0010%\u001a\u00020(2\u0014\u0010\u0002\u001a\u0010\u0012\u0004\u0012\u00020\u0004\u0012\u0006\u0012\u0004\u0018\u00010\u00010\u0003H\u0002J4\u0010)\u001a\u00020'2\u0006\u0010$\u001a\u00020\u00042\u000e\u0010*\u001a\n\u0012\u0006\b\u0001\u0012\u00020&0+2\u0014\u0010\u0002\u001a\u0010\u0012\u0004\u0012\u00020\u0004\u0012\u0006\u0012\u0004\u0018\u00010\u00010\u0003JF\u0010,\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0004\u0012\u0004\u0012\u00020\u00010\u00030+2\u0006\u0010$\u001a\u00020\u00042\u000e\u0010*\u001a\n\u0012\u0006\b\u0001\u0012\u00020&0+2\u0014\u0010\u0002\u001a\u0010\u0012\u0004\u0012\u00020\u0004\u0012\u0006\u0012\u0004\u0018\u00010\u00010\u0003J\u0006\u0010-\u001a\u00020'J\u001a\u0010.\u001a\u00020/2\u0010\u0010\u001b\u001a\f\u0012\u0002\b\u0003\u0012\u0002\b\u0003\u0018\u00010\u001cH\u0002J\u0006\u00100\u001a\u00020'R\u001a\u0010\u0002\u001a\u000e\u0012\u0004\u0012\u00020\u0004\u0012\u0004\u0012\u00020\u00040\u0003X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0011\u0010\n\u001a\u00020\u000b\u00a2\u0006\b\n\u0000\u001a\u0004\b\f\u0010\rR\u001b\u0010\u000e\u001a\u00020\u000f8BX\u0082\u0084\u0002\u00a2\u0006\f\n\u0004\b\u0012\u0010\u0013\u001a\u0004\b\u0010\u0010\u0011R\u001b\u0010\u0014\u001a\u00020\u00158BX\u0082\u0084\u0002\u00a2\u0006\f\n\u0004\b\u0018\u0010\u0013\u001a\u0004\b\u0016\u0010\u0017R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0019\u001a\u00020\u001aX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u001c\u0010\u001b\u001a\u0010\u0012\u0004\u0012\u00020\u001d\u0012\u0004\u0012\u00020\u001d\u0018\u00010\u001cX\u0082\u000e\u00a2\u0006\u0002\n\u0000\u00a8\u00061"}, d2={"Lapoc/kafka/producer/kafka/KafkaEventRouter;", "", "config", "", "", "db", "Lorg/neo4j/graphdb/GraphDatabaseService;", "log", "Lorg/neo4j/logging/Log;", "(Ljava/util/Map;Lorg/neo4j/graphdb/GraphDatabaseService;Lorg/neo4j/logging/Log;)V", "eventRouterConfiguration", "Lapoc/kafka/producer/StreamsEventRouterConfiguration;", "getEventRouterConfiguration", "()Lapoc/kafka/producer/StreamsEventRouterConfiguration;", "kafkaAdminService", "Lapoc/kafka/producer/kafka/KafkaAdminService;", "getKafkaAdminService", "()Lapoc/kafka/producer/kafka/KafkaAdminService;", "kafkaAdminService$delegate", "Lkotlin/Lazy;", "kafkaConfig", "Lapoc/kafka/producer/kafka/KafkaConfiguration;", "getKafkaConfig", "()Lapoc/kafka/producer/kafka/KafkaConfiguration;", "kafkaConfig$delegate", "mutex", "Lkotlinx/coroutines/sync/Mutex;", "producer", "Lapoc/kafka/producer/kafka/Neo4jKafkaProducer;", "", "send", "producerRecord", "Lorg/apache/kafka/clients/producer/ProducerRecord;", "sync", "", "sendEvent", "topic", "event", "Lapoc/kafka/events/StreamsEvent;", "", "Lapoc/kafka/events/StreamsTransactionEvent;", "sendEvents", "transactionEvents", "", "sendEventsSync", "start", "status", "Lapoc/kafka/events/KafkaStatus;", "stop", "apoc"})
@SourceDebugExtension(value={"SMAP\nKafkaEventRouter.kt\nKotlin\n*S Kotlin\n*F\n+ 1 KafkaEventRouter.kt\napoc/kafka/producer/kafka/KafkaEventRouter\n+ 2 fake.kt\nkotlin/jvm/internal/FakeKt\n+ 3 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n*L\n1#1,194:1\n1#2:195\n1#2:206\n1603#3,9:196\n1855#3:205\n1856#3:207\n1612#3:208\n1855#3,2:209\n*S KotlinDebug\n*F\n+ 1 KafkaEventRouter.kt\napoc/kafka/producer/kafka/KafkaEventRouter\n*L\n128#1:206\n128#1:196,9\n128#1:205\n128#1:207\n128#1:208\n139#1:209,2\n*E\n"})
public final class KafkaEventRouter {
    @NotNull
    private final Map<String, String> config;
    @NotNull
    private final GraphDatabaseService db;
    @NotNull
    private final Log log;
    @NotNull
    private final StreamsEventRouterConfiguration eventRouterConfiguration;
    @NotNull
    private final Mutex mutex;
    @Nullable
    private Neo4jKafkaProducer<byte[], byte[]> producer;
    @NotNull
    private final Lazy kafkaConfig$delegate;
    @NotNull
    private final Lazy kafkaAdminService$delegate;

    public KafkaEventRouter(@NotNull Map<String, String> config, @NotNull GraphDatabaseService db, @NotNull Log log) {
        Intrinsics.checkNotNullParameter(config, (String)"config");
        Intrinsics.checkNotNullParameter((Object)db, (String)"db");
        Intrinsics.checkNotNullParameter((Object)log, (String)"log");
        this.config = config;
        this.db = db;
        this.log = log;
        String string = this.db.databaseName();
        Intrinsics.checkNotNullExpressionValue((Object)string, (String)"databaseName(...)");
        this.eventRouterConfiguration = StreamsEventRouterConfiguration.Companion.from(this.config, string, GraphDatabaseServerExtensionsKt.isDefaultDb(this.db), this.log);
        this.mutex = MutexKt.Mutex$default((boolean)false, (int)1, null);
        this.kafkaConfig$delegate = LazyKt.lazy((Function0)((Function0)new Function0<KafkaConfiguration>(this){
            final /* synthetic */ KafkaEventRouter this$0;
            {
                this.this$0 = $receiver;
                super(0);
            }

            @NotNull
            public final KafkaConfiguration invoke() {
                return KafkaConfiguration.Companion.from(KafkaEventRouter.access$getConfig$p(this.this$0), KafkaEventRouter.access$getLog$p(this.this$0));
            }
        }));
        this.kafkaAdminService$delegate = LazyKt.lazy((Function0)((Function0)new Function0<KafkaAdminService>(this){
            final /* synthetic */ KafkaEventRouter this$0;
            {
                this.this$0 = $receiver;
                super(0);
            }

            @NotNull
            public final KafkaAdminService invoke() {
                return new KafkaAdminService(KafkaEventRouter.access$getKafkaConfig(this.this$0), KafkaEventRouter.access$getLog$p(this.this$0));
            }
        }));
    }

    @NotNull
    public final StreamsEventRouterConfiguration getEventRouterConfiguration() {
        return this.eventRouterConfiguration;
    }

    private final KafkaConfiguration getKafkaConfig() {
        Lazy lazy = this.kafkaConfig$delegate;
        return (KafkaConfiguration)lazy.getValue();
    }

    private final KafkaAdminService getKafkaAdminService() {
        Lazy lazy = this.kafkaAdminService$delegate;
        return (KafkaAdminService)lazy.getValue();
    }

    private final KafkaStatus status(Neo4jKafkaProducer<?, ?> producer) {
        return producer != null ? KafkaStatus.RUNNING : KafkaStatus.STOPPED;
    }

    public final void start() {
        BuildersKt.runBlocking$default(null, (Function2)((Function2)new Function2<CoroutineScope, Continuation<? super Unit>, Object>(this, null){
            Object L$0;
            Object L$1;
            Object L$2;
            int label;
            final /* synthetic */ KafkaEventRouter this$0;
            {
                this.this$0 = $receiver;
                super(2, $completion);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Unable to fully structure code
             */
            @Nullable
            public final Object invokeSuspend(@NotNull Object var1_1) {
                var10_2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch (this.label) {
                    case 0: {
                        ResultKt.throwOnFailure((Object)var1_1);
                        var2_3 = KafkaEventRouter.access$getMutex$p(this.this$0);
                        var3_4 = KafkaEventRouter.access$getProducer$p(this.this$0);
                        var4_5 = this.this$0;
                        $i$f$withLock = false;
                        this.L$0 = $this$withLock$iv;
                        this.L$1 = owner$iv;
                        this.L$2 = var4_5;
                        this.label = 1;
                        v0 = $this$withLock$iv.lock((Object)owner$iv, (Continuation)this);
                        if (v0 == var10_2) {
                            return var10_2;
                        }
                        ** GOTO lbl24
                    }
                    case 1: {
                        $i$f$withLock = false;
                        var4_5 = (KafkaEventRouter)this.L$2;
                        owner$iv = (Neo4jKafkaProducer)this.L$1;
                        $this$withLock$iv = (Mutex)this.L$0;
                        ResultKt.throwOnFailure((Object)$result);
                        v0 = $result;
lbl24:
                        // 2 sources

                        try {
                            $i$a$-withLock-KafkaEventRouter$start$1$1 = false;
                            if (KafkaEventRouter.access$status(var4_5, KafkaEventRouter.access$getProducer$p(var4_5)) == KafkaStatus.RUNNING) {
                                var9_9 = Unit.INSTANCE;
                                return var9_9;
                            }
                            KafkaEventRouter.access$getLog$p(var4_5).info("Initialising Kafka Connector");
                            props = KafkaEventRouter.access$getKafkaConfig(var4_5).asProperties();
                            KafkaEventRouter.access$setProducer$p(var4_5, new Neo4jKafkaProducer<K, V>(props));
                            v1 = KafkaEventRouter.access$getProducer$p(var4_5);
                            Intrinsics.checkNotNull((Object)v1);
                            v1.initTransactions();
                            KafkaEventRouter.access$getLog$p(var4_5).info("Kafka Connector started");
                            var8_11 = Unit.INSTANCE;
                        }
                        finally {
                            $this$withLock$iv.unlock((Object)owner$iv);
                        }
                        return Unit.INSTANCE;
                    }
                }
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            @NotNull
            public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> $completion) {
                return (Continuation)new /* invalid duplicate definition of identical inner class */;
            }

            @Nullable
            public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<? super Unit> p2) {
                return (this.create(p1, p2)).invokeSuspend(Unit.INSTANCE);
            }
        }), (int)1, null);
    }

    public final void stop() {
        BuildersKt.runBlocking$default(null, (Function2)((Function2)new Function2<CoroutineScope, Continuation<? super Unit>, Object>(this, null){
            Object L$0;
            Object L$1;
            Object L$2;
            int label;
            final /* synthetic */ KafkaEventRouter this$0;
            {
                this.this$0 = $receiver;
                super(2, $completion);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Unable to fully structure code
             */
            @Nullable
            public final Object invokeSuspend(@NotNull Object var1_1) {
                var10_2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch (this.label) {
                    case 0: {
                        ResultKt.throwOnFailure((Object)var1_1);
                        var2_3 = KafkaEventRouter.access$getMutex$p(this.this$0);
                        var3_4 = KafkaEventRouter.access$getProducer$p(this.this$0);
                        var4_5 = this.this$0;
                        $i$f$withLock = false;
                        this.L$0 = $this$withLock$iv;
                        this.L$1 = owner$iv;
                        this.L$2 = var4_5;
                        this.label = 1;
                        v0 = $this$withLock$iv.lock((Object)owner$iv, (Continuation)this);
                        if (v0 == var10_2) {
                            return var10_2;
                        }
                        ** GOTO lbl24
                    }
                    case 1: {
                        $i$f$withLock = false;
                        var4_5 = (KafkaEventRouter)this.L$2;
                        owner$iv = (Neo4jKafkaProducer)this.L$1;
                        $this$withLock$iv = (Mutex)this.L$0;
                        ResultKt.throwOnFailure((Object)$result);
                        v0 = $result;
lbl24:
                        // 2 sources

                        try {
                            $i$a$-withLock-KafkaEventRouter$stop$1$1 = false;
                            if (KafkaEventRouter.access$status(var4_5, KafkaEventRouter.access$getProducer$p(var4_5)) == KafkaStatus.STOPPED) {
                                var9_9 = Unit.INSTANCE;
                                return var9_9;
                            }
                            var7_10 = new Class[]{UninitializedPropertyAccessException.class};
                            KafkaUtil.INSTANCE.ignoreExceptions((Function0)new Function0<Unit>(var4_5){
                                final /* synthetic */ KafkaEventRouter this$0;
                                {
                                    this.this$0 = $receiver;
                                    super(0);
                                }

                                @Nullable
                                public final Unit invoke() {
                                    Unit unit;
                                    Neo4jKafkaProducer neo4jKafkaProducer = KafkaEventRouter.access$getProducer$p(this.this$0);
                                    if (neo4jKafkaProducer != null) {
                                        neo4jKafkaProducer.flush();
                                        unit = Unit.INSTANCE;
                                    } else {
                                        unit = null;
                                    }
                                    return unit;
                                }
                            }, var7_10);
                            var7_10 = new Class[]{UninitializedPropertyAccessException.class};
                            KafkaUtil.INSTANCE.ignoreExceptions((Function0)new Function0<Unit>(var4_5){
                                final /* synthetic */ KafkaEventRouter this$0;
                                {
                                    this.this$0 = $receiver;
                                    super(0);
                                }

                                @Nullable
                                public final Unit invoke() {
                                    Unit unit;
                                    Neo4jKafkaProducer neo4jKafkaProducer = KafkaEventRouter.access$getProducer$p(this.this$0);
                                    if (neo4jKafkaProducer != null) {
                                        neo4jKafkaProducer.close();
                                        unit = Unit.INSTANCE;
                                    } else {
                                        unit = null;
                                    }
                                    return unit;
                                }
                            }, var7_10);
                            var7_10 = new Class[]{UninitializedPropertyAccessException.class};
                            KafkaUtil.INSTANCE.ignoreExceptions((Function0)new Function0<Unit>(var4_5){
                                final /* synthetic */ KafkaEventRouter this$0;
                                {
                                    this.this$0 = $receiver;
                                    super(0);
                                }

                                public final void invoke() {
                                    KafkaEventRouter.access$getKafkaAdminService(this.this$0).stop();
                                }
                            }, var7_10);
                            KafkaEventRouter.access$setProducer$p(var4_5, null);
                            var8_11 = Unit.INSTANCE;
                        }
                        finally {
                            $this$withLock$iv.unlock((Object)owner$iv);
                        }
                        return Unit.INSTANCE;
                    }
                }
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            @NotNull
            public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> $completion) {
                return (Continuation)new /* invalid duplicate definition of identical inner class */;
            }

            @Nullable
            public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<? super Unit> p2) {
                return (this.create(p1, p2)).invokeSuspend(Unit.INSTANCE);
            }
        }), (int)1, null);
    }

    private final Map<String, Object> send(ProducerRecord<byte[], byte[]> producerRecord, boolean sync) {
        Map<String, Object> map;
        KafkaAdminService kafkaAdminService2 = this.getKafkaAdminService();
        String string = producerRecord.topic();
        Intrinsics.checkNotNullExpressionValue((Object)string, (String)"topic(...)");
        if (!kafkaAdminService2.isValidTopic(string)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Error while sending record to " + producerRecord.topic() + ", because it doesn't exists");
            }
            return null;
        }
        if (sync) {
            Object object = this.producer;
            map = object != null && (object = object.send(producerRecord)) != null && (object = (RecordMetadata)object.get()) != null ? ExtensionsKt.toMap((RecordMetadata)object) : null;
        } else {
            Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer = this.producer;
            if (neo4jKafkaProducer != null) {
                neo4jKafkaProducer.send(producerRecord, (arg_0, arg_1) -> KafkaEventRouter.send$lambda$0(this, producerRecord, arg_0, arg_1));
            }
            map = null;
        }
        return map;
    }

    static /* synthetic */ Map send$default(KafkaEventRouter kafkaEventRouter, ProducerRecord producerRecord, boolean bl, int n, Object object) {
        if ((n & 2) != 0) {
            bl = false;
        }
        return kafkaEventRouter.send((ProducerRecord<byte[], byte[]>)producerRecord, bl);
    }

    /*
     * WARNING - void declaration
     */
    private final Map<String, Object> sendEvent(String topic, StreamsEvent event, Map<String, ? extends Object> config, boolean sync) {
        byte[] byArray;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Trying to send a simple event with payload " + event.getPayload() + " to kafka");
        }
        Object key = config.getOrDefault("key", UUID.randomUUID().toString());
        Object object = config.get("partition");
        Integer partition = object != null && (object = object.toString()) != null ? Integer.valueOf(Integer.parseInt((String)object)) : null;
        String string = topic;
        Integer n = partition;
        Long l = System.currentTimeMillis();
        Object object2 = key;
        if (object2 != null) {
            void it;
            Object object3 = object2;
            Long l2 = l;
            Integer n2 = n;
            String string2 = string;
            boolean bl = false;
            byte[] byArray2 = JSONUtils.INSTANCE.writeValueAsBytes(it);
            string = string2;
            n = n2;
            l = l2;
            byArray = byArray2;
        } else {
            byArray = null;
        }
        byte[] byArray3 = JSONUtils.INSTANCE.writeValueAsBytes(event);
        byte[] byArray4 = byArray;
        Long l3 = l;
        Integer n3 = n;
        String string3 = string;
        ProducerRecord producerRecord = new ProducerRecord(string3, n3, l3, (Object)byArray4, (Object)byArray3);
        return this.send((ProducerRecord<byte[], byte[]>)producerRecord, sync);
    }

    static /* synthetic */ Map sendEvent$default(KafkaEventRouter kafkaEventRouter, String string, StreamsEvent streamsEvent, Map map, boolean bl, int n, Object object) {
        if ((n & 8) != 0) {
            bl = false;
        }
        return kafkaEventRouter.sendEvent(string, streamsEvent, map, bl);
    }

    private final void sendEvent(String topic, StreamsTransactionEvent event, Map<String, ? extends Object> config) {
        byte[] byArray;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Trying to send a transaction event with txId " + event.getMeta().getTxId() + " and txEventId " + event.getMeta().getTxEventId() + " to kafka");
        }
        byte[] key = JSONUtils.INSTANCE.writeValueAsBytes(ExtensionsKt.asSourceRecordKey(event, this.getKafkaConfig().getLogCompactionStrategy()));
        StreamsTransactionEvent streamsTransactionEvent = ExtensionsKt.asSourceRecordValue(event, this.getKafkaConfig().getLogCompactionStrategy());
        if (streamsTransactionEvent != null) {
            StreamsTransactionEvent it = streamsTransactionEvent;
            boolean bl = false;
            byArray = JSONUtils.INSTANCE.writeValueAsBytes(it);
        } else {
            byArray = null;
        }
        byte[] value = byArray;
        ProducerRecord producerRecord = new ProducerRecord(topic, null, Long.valueOf(System.currentTimeMillis()), (Object)key, (Object)value);
        KafkaEventRouter.send$default(this, producerRecord, false, 2, null);
    }

    /*
     * WARNING - void declaration
     */
    @NotNull
    public final List<Map<String, Object>> sendEventsSync(@NotNull String topic, @NotNull List<? extends StreamsEvent> transactionEvents, @NotNull Map<String, ? extends Object> config) {
        List results;
        block2: {
            void $this$mapNotNullTo$iv$iv;
            Intrinsics.checkNotNullParameter((Object)topic, (String)"topic");
            Intrinsics.checkNotNullParameter(transactionEvents, (String)"transactionEvents");
            Intrinsics.checkNotNullParameter(config, (String)"config");
            Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer = this.producer;
            if (neo4jKafkaProducer != null) {
                neo4jKafkaProducer.beginTransaction();
            }
            Iterable $this$mapNotNull$iv = transactionEvents;
            boolean $i$f$mapNotNull = false;
            Iterable iterable = $this$mapNotNull$iv;
            Collection destination$iv$iv = new ArrayList();
            boolean $i$f$mapNotNullTo = false;
            void $this$forEach$iv$iv$iv = $this$mapNotNullTo$iv$iv;
            boolean $i$f$forEach = false;
            Iterator iterator = $this$forEach$iv$iv$iv.iterator();
            while (iterator.hasNext()) {
                Map<String, Object> it$iv$iv;
                Object element$iv$iv$iv;
                Object element$iv$iv = element$iv$iv$iv = iterator.next();
                boolean bl = false;
                StreamsEvent it = (StreamsEvent)element$iv$iv;
                boolean bl2 = false;
                if (this.sendEvent(topic, it, config, true) == null) continue;
                boolean bl3 = false;
                destination$iv$iv.add(it$iv$iv);
            }
            results = (List)destination$iv$iv;
            Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer2 = this.producer;
            if (neo4jKafkaProducer2 == null) break block2;
            neo4jKafkaProducer2.commitTransaction();
        }
        return results;
    }

    public final void sendEvents(@NotNull String topic, @NotNull List<? extends StreamsEvent> transactionEvents, @NotNull Map<String, ? extends Object> config) {
        block8: {
            Intrinsics.checkNotNullParameter((Object)topic, (String)"topic");
            Intrinsics.checkNotNullParameter(transactionEvents, (String)"transactionEvents");
            Intrinsics.checkNotNullParameter(config, (String)"config");
            try {
                Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer = this.producer;
                if (neo4jKafkaProducer != null) {
                    neo4jKafkaProducer.beginTransaction();
                }
                Iterable $this$forEach$iv = transactionEvents;
                boolean $i$f$forEach = false;
                for (Object element$iv : $this$forEach$iv) {
                    StreamsEvent it = (StreamsEvent)element$iv;
                    boolean bl = false;
                    if (it instanceof StreamsTransactionEvent) {
                        this.sendEvent(topic, (StreamsTransactionEvent)it, config);
                        continue;
                    }
                    KafkaEventRouter.sendEvent$default(this, topic, it, config, false, 8, null);
                }
                Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer2 = this.producer;
                if (neo4jKafkaProducer2 == null) break block8;
                neo4jKafkaProducer2.commitTransaction();
            }
            catch (ProducerFencedException e) {
                this.log.error("Another producer with the same transactional.id has been started. Stack trace is:", (Throwable)e);
                Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer = this.producer;
                if (neo4jKafkaProducer == null) break block8;
                neo4jKafkaProducer.close();
            }
            catch (OutOfOrderSequenceException e) {
                this.log.error("The broker received an unexpected sequence number from the producer. Stack trace is:", (Throwable)e);
                Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer = this.producer;
                if (neo4jKafkaProducer == null) break block8;
                neo4jKafkaProducer.close();
            }
            catch (AuthorizationException e) {
                this.log.error("Error in authorization. Stack trace is:", (Throwable)e);
                Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer = this.producer;
                if (neo4jKafkaProducer == null) break block8;
                neo4jKafkaProducer.close();
            }
            catch (KafkaException e) {
                this.log.error("Generic kafka error. Stack trace is:", (Throwable)e);
                Neo4jKafkaProducer<byte[], byte[]> neo4jKafkaProducer = this.producer;
                if (neo4jKafkaProducer == null) break block8;
                neo4jKafkaProducer.abortTransaction();
            }
        }
    }

    private static final void send$lambda$0(KafkaEventRouter this$0, ProducerRecord $producerRecord, RecordMetadata meta, Exception error) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        Intrinsics.checkNotNullParameter((Object)$producerRecord, (String)"$producerRecord");
        if (meta != null && this$0.log.isDebugEnabled()) {
            this$0.log.debug("Successfully sent record in partition " + meta.partition() + " offset " + meta.offset() + " data " + meta.topic() + " key size " + meta.serializedKeySize());
        }
        if (error != null && this$0.log.isDebugEnabled()) {
            this$0.log.debug("Error while sending record to " + $producerRecord.topic() + ", because of the following exception:", (Throwable)error);
        }
    }

    public static final /* synthetic */ Mutex access$getMutex$p(KafkaEventRouter $this) {
        return $this.mutex;
    }

    public static final /* synthetic */ Neo4jKafkaProducer access$getProducer$p(KafkaEventRouter $this) {
        return $this.producer;
    }

    public static final /* synthetic */ KafkaStatus access$status(KafkaEventRouter $this, Neo4jKafkaProducer producer) {
        return $this.status(producer);
    }

    public static final /* synthetic */ Log access$getLog$p(KafkaEventRouter $this) {
        return $this.log;
    }

    public static final /* synthetic */ KafkaConfiguration access$getKafkaConfig(KafkaEventRouter $this) {
        return $this.getKafkaConfig();
    }

    public static final /* synthetic */ void access$setProducer$p(KafkaEventRouter $this, Neo4jKafkaProducer neo4jKafkaProducer) {
        $this.producer = neo4jKafkaProducer;
    }

    public static final /* synthetic */ KafkaAdminService access$getKafkaAdminService(KafkaEventRouter $this) {
        return $this.getKafkaAdminService();
    }

    public static final /* synthetic */ Map access$getConfig$p(KafkaEventRouter $this) {
        return $this.config;
    }
}

