/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.transactions;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaClientService;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.TopicPartitions;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
import io.smallrye.reactive.messaging.kafka.transactions.TransactionalEmitter;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterImpl;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.eclipse.microprofile.reactive.messaging.Message;

public class KafkaTransactionsImpl<T>
extends MutinyEmitterImpl<T>
implements KafkaTransactions<T> {
    private final KafkaClientService clientService;
    private final KafkaProducer<?, ?> producer;
    private volatile Transaction<?> currentTransaction;
    private final ReentrantLock lock = new ReentrantLock();
    private static final Uni<Void> VOID_UNI = Uni.createFrom().voidItem();

    public KafkaTransactionsImpl(EmitterConfiguration config, long defaultBufferSize, KafkaClientService clientService) {
        super(config, defaultBufferSize);
        this.clientService = clientService;
        this.producer = clientService.getProducer(config.name());
    }

    @Override
    public boolean isTransactionInProgress() {
        this.lock.lock();
        try {
            boolean bl = this.currentTransaction != null;
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    @CheckReturnValue
    public <R> Uni<R> withTransaction(Function<TransactionalEmitter<T>, Uni<R>> work) {
        this.lock.lock();
        try {
            if (this.currentTransaction == null) {
                Uni uni = new Transaction<R>().execute(work);
                return uni;
            }
            throw KafkaExceptions.ex.transactionInProgress(this.name);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    @CheckReturnValue
    public <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmitter<T>, Uni<R>> work) {
        this.lock.lock();
        try {
            Map<TopicPartition, OffsetAndMetadata> offsets;
            int generationId;
            String channel;
            IncomingKafkaRecordBatchMetadata metadata;
            Optional batchMetadata = message.getMetadata(IncomingKafkaRecordBatchMetadata.class);
            Optional recordMetadata = message.getMetadata(IncomingKafkaRecordMetadata.class);
            if (batchMetadata.isPresent()) {
                metadata = (IncomingKafkaRecordBatchMetadata)batchMetadata.get();
                channel = metadata.getChannel();
                generationId = metadata.getConsumerGroupGenerationId();
                offsets = metadata.getOffsets().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(((OffsetAndMetadata)e.getValue()).offset() + 1L)));
            } else if (recordMetadata.isPresent()) {
                metadata = (IncomingKafkaRecordMetadata)recordMetadata.get();
                channel = metadata.getChannel();
                offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
                generationId = metadata.getConsumerGroupGenerationId();
                offsets.put(TopicPartitions.getTopicPartition(metadata.getTopic(), metadata.getPartition()), new OffsetAndMetadata(metadata.getOffset() + 1L));
            } else {
                throw KafkaExceptions.ex.noKafkaMetadataFound(message);
            }
            List consumers = this.clientService.getConsumers(channel);
            if (consumers.isEmpty()) {
                throw KafkaExceptions.ex.unableToFindConsumerForChannel(channel);
            }
            if (consumers.size() > 1) {
                throw KafkaExceptions.ex.exactlyOnceProcessingNotSupported(channel);
            }
            KafkaConsumer consumer = consumers.get(0);
            if (this.currentTransaction == null) {
                Uni<Object> uni = new Transaction<Object>((Uni<Void>)consumer.consumerGroupMetadata().chain(groupMetadata -> {
                    if (groupMetadata.generationId() == generationId) {
                        this.producer.unwrap().sendOffsetsToTransaction(offsets, groupMetadata);
                        return Uni.createFrom().voidItem();
                    }
                    return Uni.createFrom().failure((Throwable)KafkaExceptions.ex.exactlyOnceProcessingRebalance(channel, groupMetadata.toString(), String.valueOf(generationId)));
                }), r -> Uni.createFrom().item(r), VOID_UNI, t -> consumer.resetToLastCommittedPositions().chain(() -> Uni.createFrom().failure(t))).execute(work);
                return uni;
            }
            throw KafkaExceptions.ex.transactionInProgress(this.name);
        }
        finally {
            this.lock.unlock();
        }
    }

    private static <R> Uni<R> defaultAfterCommit(R result) {
        return Uni.createFrom().item(result);
    }

    private static <R> Uni<R> defaultAfterAbort(Throwable throwable) {
        return Uni.createFrom().failure(throwable);
    }

    private class Transaction<R>
    implements TransactionalEmitter<T> {
        private final Uni<Void> beforeCommit;
        private final Function<R, Uni<R>> afterCommit;
        private final Uni<Void> beforeAbort;
        private final Function<Throwable, Uni<R>> afterAbort;
        private final List<Uni<Void>> sendUnis = new CopyOnWriteArrayList<Uni<Void>>();
        private volatile boolean abort;

        public Transaction() {
            this(VOID_UNI, x$0 -> KafkaTransactionsImpl.defaultAfterCommit(x$0), VOID_UNI, x$0 -> KafkaTransactionsImpl.defaultAfterAbort(x$0));
        }

        public Transaction(Uni<Void> beforeCommit, Function<R, Uni<R>> afterCommit, Uni<Void> beforeAbort, Function<Throwable, Uni<R>> afterAbort) {
            this.beforeCommit = beforeCommit;
            this.afterCommit = afterCommit;
            this.beforeAbort = beforeAbort;
            this.afterAbort = afterAbort;
        }

        Uni<R> execute(Function<TransactionalEmitter<T>, Uni<R>> work) {
            KafkaTransactionsImpl.this.currentTransaction = this;
            ContextExecutor executor = new ContextExecutor();
            return KafkaTransactionsImpl.this.producer.beginTransaction().plug(executor::emitOn).chain(() -> this.executeInTransaction(work)).eventually(() -> {
                KafkaTransactionsImpl.this.currentTransaction = null;
            }).plug(executor::emitOn);
        }

        private Uni<R> executeInTransaction(Function<TransactionalEmitter<T>, Uni<R>> work) {
            return Uni.createFrom().nullItem().chain(() -> (Uni)work.apply(this)).eventually(() -> this.waitOnSend()).call(() -> KafkaTransactionsImpl.this.producer.flush()).onFailure().call(throwable -> this.abort()).onCancellation().call(() -> this.abort()).call(() -> this.abort ? this.abort() : this.commit().onFailure().recoverWithUni(throwable -> {
                KafkaLogging.log.transactionCommitFailed((Throwable)throwable);
                return this.abort();
            })).onFailure().recoverWithUni(throwable -> this.afterAbort.apply((Throwable)throwable)).onItem().transformToUni(result -> this.afterCommit.apply(result));
        }

        private Uni<List<Void>> waitOnSend() {
            return this.sendUnis.isEmpty() ? Uni.createFrom().nullItem() : Uni.join().all(this.sendUnis).andCollectFailures();
        }

        private Uni<Void> commit() {
            return this.beforeCommit.call(KafkaTransactionsImpl.this.producer::commitTransaction);
        }

        private Uni<Void> abort() {
            Uni uni = this.beforeAbort.call(KafkaTransactionsImpl.this.producer::abortTransaction);
            return this.abort ? uni.chain(() -> Uni.createFrom().failure((Throwable)new TransactionAbortedException())) : uni;
        }

        @Override
        public <M extends Message<? extends T>> void send(M msg) {
            CompletableFuture send = KafkaTransactionsImpl.this.sendMessage(msg).onFailure().invoke(KafkaLogging.log::unableToSendRecord).subscribeAsCompletionStage();
            this.sendUnis.add((Uni<Void>)Uni.createFrom().completionStage((CompletionStage)send));
        }

        @Override
        public void send(T payload) {
            CompletableFuture send = KafkaTransactionsImpl.this.send(payload).onFailure().invoke(KafkaLogging.log::unableToSendRecord).subscribeAsCompletionStage();
            this.sendUnis.add((Uni<Void>)Uni.createFrom().completionStage((CompletionStage)send));
        }

        @Override
        public void markForAbort() {
            this.abort = true;
        }

        @Override
        public boolean isMarkedForAbort() {
            return this.abort;
        }
    }

    private static class ContextExecutor
    implements Executor {
        private final Context context;
        private final boolean ioThread;

        ContextExecutor() {
            this(Vertx.currentContext(), Context.isOnEventLoopThread());
        }

        ContextExecutor(Context context, boolean ioThread) {
            this.context = context;
            this.ioThread = ioThread;
        }

        <T> Uni<T> emitOn(Uni<T> uni) {
            return this.context == null ? uni : uni.emitOn((Executor)this);
        }

        @Override
        public void execute(Runnable command) {
            if (this.context == null) {
                command.run();
            } else if (this.ioThread) {
                VertxContext.runOnContext((Context)this.context, (Runnable)command);
            } else {
                VertxContext.executeBlocking((Context)this.context, (Runnable)command);
            }
        }
    }
}

