/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.pulsar.transactions;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterImpl;
import io.smallrye.reactive.messaging.pulsar.PulsarClientService;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactionMetadata;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;
import io.smallrye.reactive.messaging.pulsar.transactions.TransactionalEmitter;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import java.time.Duration;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionBuilder;
import org.eclipse.microprofile.reactive.messaging.Message;

public class PulsarTransactionsImpl<T>
extends MutinyEmitterImpl<T>
implements PulsarTransactions<T> {
    private final PulsarClient pulsarClient;
    private final AtomicInteger txnCount = new AtomicInteger();
    private final PulsarClientService pulsarClientService;
    private static final Uni<Void> VOID_UNI = Uni.createFrom().voidItem();

    public PulsarTransactionsImpl(EmitterConfiguration config, long defaultBufferSize, PulsarClientService pulsarClientService) {
        super(config, defaultBufferSize);
        this.pulsarClientService = pulsarClientService;
        this.pulsarClient = pulsarClientService.getClient(config.name());
    }

    @Override
    public <R> Uni<R> withTransaction(Function<TransactionalEmitter<T>, Uni<R>> work) {
        return new PulsarTransactionEmitter<R>().execute(work);
    }

    @Override
    public <R> Uni<R> withTransaction(Duration txnTimeout, Function<TransactionalEmitter<T>, Uni<R>> work) {
        return new PulsarTransactionEmitter<R>(txnTimeout).execute(work);
    }

    @Override
    public <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmitter<T>, Uni<R>> work) {
        return this.withTransaction(null, message, work);
    }

    @Override
    public <R> Uni<R> withTransaction(Duration txnTimeout, Message<?> message, Function<TransactionalEmitter<T>, Uni<R>> work) {
        return new PulsarTransactionEmitter<Object>(txnTimeout, txn -> Uni.createFrom().completionStage(message.ack()), PulsarTransactionsImpl::defaultAfterCommit, (txn, throwable) -> {
            if (!(throwable.getCause() instanceof PulsarClientException.TransactionConflictException)) {
                return Uni.createFrom().completionStage(() -> message.nack(throwable));
            }
            return VOID_UNI;
        }, PulsarTransactionsImpl::defaultAfterAbort).execute(e -> {
            if (message instanceof MetadataInjectableMessage) {
                ((MetadataInjectableMessage)message).injectMetadata((Object)new PulsarTransactionMetadata(e.getTransaction(null)));
            }
            return (Uni)work.apply((TransactionalEmitter)e);
        });
    }

    @Override
    public <M extends Message<? extends T>> void send(TransactionalEmitter<?> emitter, M msg) {
        Transaction transaction = emitter.getTransaction(this.name);
        this.sendMessage(msg.addMetadata((Object)new PulsarTransactionMetadata(transaction))).subscribe().with(unused -> {}, PulsarLogging.log::unableToDispatch);
    }

    @Override
    public void send(TransactionalEmitter<?> emitter, T payload) {
        this.send(emitter, (T)Message.of(payload));
    }

    @Override
    public boolean isTransactionInProgress() {
        return this.txnCount.get() != 0;
    }

    private static <R> Uni<R> defaultAfterCommit(R result) {
        return Uni.createFrom().item(result);
    }

    private static <R> Uni<R> defaultAfterAbort(Throwable throwable) {
        return Uni.createFrom().failure(throwable);
    }

    private class PulsarTransactionEmitter<R>
    implements TransactionalEmitter<T> {
        private final Function<Transaction, Uni<Void>> beforeCommit;
        private final Function<R, Uni<R>> afterCommit;
        private final BiFunction<Transaction, Throwable, Uni<Void>> beforeAbort;
        private final Function<Throwable, Uni<R>> afterAbort;
        private final Duration txnTimeout;
        private volatile boolean abort;
        private volatile Transaction currentTransaction;
        private final Set<String> producerChannels = new HashSet<String>();

        public PulsarTransactionEmitter() {
            this(null, txn -> VOID_UNI, x$0 -> PulsarTransactionsImpl.defaultAfterCommit(x$0), (txn, throwable) -> VOID_UNI, x$0 -> PulsarTransactionsImpl.defaultAfterAbort(x$0));
        }

        public PulsarTransactionEmitter(Duration txnTimeout) {
            this(txnTimeout, txn -> VOID_UNI, x$0 -> PulsarTransactionsImpl.defaultAfterCommit(x$0), (txn, throwable) -> VOID_UNI, x$0 -> PulsarTransactionsImpl.defaultAfterAbort(x$0));
        }

        public PulsarTransactionEmitter(Duration txnTimeout, Function<Transaction, Uni<Void>> beforeCommit, Function<R, Uni<R>> afterCommit, BiFunction<Transaction, Throwable, Uni<Void>> beforeAbort, Function<Throwable, Uni<R>> afterAbort) {
            this.txnTimeout = txnTimeout;
            this.beforeCommit = beforeCommit;
            this.afterCommit = afterCommit;
            this.beforeAbort = beforeAbort;
            this.afterAbort = afterAbort;
        }

        private Uni<Transaction> createTransaction() {
            return Uni.createFrom().emitter(emitter -> {
                TransactionBuilder builder = PulsarTransactionsImpl.this.pulsarClient.newTransaction();
                if (this.txnTimeout != null) {
                    builder = builder.withTransactionTimeout(this.txnTimeout.toMillis(), TimeUnit.MILLISECONDS);
                }
                builder.build().whenComplete((transaction, throwable) -> {
                    if (throwable == null) {
                        emitter.complete(transaction);
                    } else {
                        emitter.fail(throwable);
                    }
                });
            });
        }

        private Uni<Void> flushProducers() {
            return Multi.createFrom().iterable(this.producerChannels).map(PulsarTransactionsImpl.this.pulsarClientService::getProducer).filter(Objects::nonNull).onItem().transformToUniAndMerge(p -> Uni.createFrom().completionStage((CompletionStage)p.flushAsync())).toUni();
        }

        Uni<R> execute(Function<TransactionalEmitter<T>, Uni<R>> work) {
            Context context = Vertx.currentContext();
            Uni transaction = this.createTransaction();
            if (context != null) {
                transaction = transaction.emitOn(runnable -> context.runOnContext(x -> runnable.run()));
            }
            return transaction.invoke(txn -> {
                this.currentTransaction = txn;
                PulsarTransactionsImpl.this.txnCount.incrementAndGet();
            }).chain(txn -> this.executeInTransaction(work)).eventually(() -> {
                PulsarTransactionsImpl.this.txnCount.decrementAndGet();
                this.currentTransaction = null;
            });
        }

        private Uni<R> executeInTransaction(Function<TransactionalEmitter<T>, Uni<R>> work) {
            return Uni.createFrom().nullItem().chain(() -> (Uni)work.apply(this)).call(() -> this.flushProducers()).onFailure().call(throwable -> this.abort((Throwable)throwable)).onCancellation().call(() -> this.abort(new RuntimeException("Transaction cancelled"))).call(() -> this.abort ? this.abort(new RuntimeException("Transaction aborted")) : this.commit()).onFailure().recoverWithUni(throwable -> this.afterAbort.apply((Throwable)throwable)).onItem().transformToUni(result -> this.afterCommit.apply(result));
        }

        private Uni<Void> commit() {
            return this.beforeCommit.apply(this.currentTransaction).chain(txn -> Uni.createFrom().completionStage(() -> ((Transaction)this.currentTransaction).commit()));
        }

        private Uni<Void> abort(Throwable throwable) {
            return this.beforeAbort.apply(this.currentTransaction, throwable).chain(x -> Uni.createFrom().completionStage(() -> ((Transaction)this.currentTransaction).abort())).plug(uni -> this.abort ? uni.chain(() -> Uni.createFrom().failure(throwable)) : uni);
        }

        @Override
        public <M extends Message<? extends T>> void send(M msg) {
            PulsarTransactionsImpl.this.send((TransactionalEmitter<?>)this, msg);
        }

        @Override
        public void send(T payload) {
            this.send((Object)Message.of(payload));
        }

        @Override
        public void markForAbort() {
            this.abort = true;
        }

        @Override
        public boolean isMarkedForAbort() {
            return this.abort;
        }

        @Override
        public Transaction getTransaction(String name) {
            if (name != null) {
                this.producerChannels.add(name);
            }
            return this.currentTransaction;
        }
    }
}

