/*
 * Decompiled with CFR 0.152.
 */
package com.mule.extensions.amqp.internal.connection;

import com.mule.extensions.amqp.internal.common.AmqpCommons;
import com.mule.extensions.amqp.internal.connection.AmqpConnection;
import com.mule.extensions.amqp.internal.connection.channel.TransactionStatus;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ShutdownListener;
import java.io.IOException;
import java.util.Optional;
import org.mule.runtime.api.tx.TransactionException;
import org.mule.runtime.api.util.Preconditions;
import org.mule.runtime.extension.api.connectivity.TransactionalConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpTransactionalConnection
extends AmqpConnection
implements TransactionalConnection {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpTransactionalConnection.class);
    private static final String COMMIT = "Commit";
    private static final String ROLLBACK = "Rollback";
    private boolean blockedByBroker = false;

    public AmqpTransactionalConnection(Connection connection) {
        super(connection);
        connection.addBlockedListener(new BlockedListener(){

            public void handleUnblocked() throws IOException {
                AmqpTransactionalConnection.this.blockedByBroker = false;
            }

            public void handleBlocked(String reason) throws IOException {
                AmqpTransactionalConnection.this.blockedByBroker = true;
            }
        });
    }

    public void begin() throws TransactionException {
        this.getAmqpChannelManager().changeTransactionStatus(TransactionStatus.STARTED);
    }

    public void commit() throws TransactionException {
        try {
            this.executeTransactionAction(COMMIT, Channel::txCommit, true);
        }
        catch (IOException e) {
            throw new TransactionException((Throwable)e);
        }
    }

    public void rollback() throws TransactionException {
        try {
            this.executeTransactionAction(ROLLBACK, Channel::txRollback, true);
        }
        catch (IOException e) {
            throw new TransactionException((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeTransactionAction(String action, ChannelAction transactionalAction, boolean unbind) throws IOException {
        Optional<Channel> transactedChannel = this.getAmqpChannelManager().getTransactedChannel();
        Preconditions.checkState((boolean)transactedChannel.isPresent(), (String)("Unable to " + action + " transaction, the TX Channel doesn't exist."));
        Channel channel = transactedChannel.get();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("AMQP Transaction " + action + " over Channel [" + channel + "]");
        }
        try {
            transactionalAction.execute(channel);
        }
        finally {
            if (unbind) {
                AmqpCommons.closeQuietly(channel);
                this.getAmqpChannelManager().changeTransactionStatus(TransactionStatus.NONE);
                this.getAmqpChannelManager().unbindChannel();
            }
        }
    }

    public void addBlockedListener(BlockedListener muleBlockedListener) {
        this.connection.addBlockedListener(muleBlockedListener);
    }

    public boolean isBlockedByBroker() {
        return this.blockedByBroker;
    }

    public void addShutdownListener(ShutdownListener shutdownListener) {
        this.connection.addShutdownListener(shutdownListener);
    }

    @FunctionalInterface
    private static interface ChannelAction {
        public void execute(Channel var1) throws IOException;
    }
}

