/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.util.queue;

import java.io.Serializable;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.Xid;
import org.apache.commons.collections.Closure;
import org.apache.commons.collections.CollectionUtils;
import org.mule.runtime.core.api.transaction.xa.ResourceManagerException;
import org.mule.runtime.core.internal.util.journal.queue.XaQueueTxJournalEntry;
import org.mule.runtime.core.internal.util.journal.queue.XaTxQueueTransactionJournal;
import org.mule.runtime.core.internal.util.queue.QueueProvider;
import org.mule.runtime.core.internal.util.queue.QueueStore;
import org.mule.runtime.core.internal.util.queue.XaQueueTransactionContext;

public class PersistentXaTransactionContext
implements XaQueueTransactionContext {
    private final XaTxQueueTransactionJournal transactionJournal;
    private final QueueProvider queueProvider;
    private Xid xid;

    public PersistentXaTransactionContext(XaTxQueueTransactionJournal simpleTxQueueTransactionJournal, QueueProvider queueProvider, Xid xid) {
        this.transactionJournal = simpleTxQueueTransactionJournal;
        this.queueProvider = queueProvider;
        this.xid = xid;
    }

    @Override
    public boolean offer(QueueStore queue, Serializable item, long offerTimeout) throws InterruptedException {
        this.transactionJournal.logAdd(this.xid, queue, item);
        return true;
    }

    @Override
    public void untake(QueueStore queue, Serializable item) throws InterruptedException {
        this.transactionJournal.logAddFirst(this.xid, queue, item);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void clear(QueueStore queue) throws InterruptedException {
        QueueStore queueStore = queue;
        synchronized (queueStore) {
            while (this.poll(queue, 100L) != null) {
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Serializable poll(QueueStore queue, long pollTimeout) throws InterruptedException {
        QueueStore queueStore = queue;
        synchronized (queueStore) {
            Serializable value = queue.peek();
            if (value == null) {
                return null;
            }
            this.transactionJournal.logRemove(this.xid, queue, value);
            return queue.poll(pollTimeout);
        }
    }

    @Override
    public Serializable peek(QueueStore queue) throws InterruptedException {
        return queue.peek();
    }

    @Override
    public int size(QueueStore queue) {
        final AtomicInteger addSize = new AtomicInteger(0);
        CollectionUtils.forAllDo(this.transactionJournal.getLogEntriesForTx(this.xid), (Closure)new Closure(){

            public void execute(Object value) {
                if (((XaQueueTxJournalEntry)value).isAdd() || ((XaQueueTxJournalEntry)value).isAddFirst()) {
                    addSize.incrementAndGet();
                }
            }
        });
        return queue.getSize() + addSize.get();
    }

    @Override
    public void doCommit() throws ResourceManagerException {
        try {
            Collection<XaQueueTxJournalEntry> logEntries = this.transactionJournal.getLogEntriesForTx(this.xid);
            for (XaQueueTxJournalEntry entry : logEntries) {
                if (entry.isAdd()) {
                    this.queueProvider.getQueue(entry.getQueueName()).putNow(entry.getValue());
                    continue;
                }
                if (!entry.isAddFirst()) continue;
                this.queueProvider.getQueue(entry.getQueueName()).untake(entry.getValue());
            }
            this.transactionJournal.logCommit(this.xid);
        }
        catch (Exception e) {
            throw new ResourceManagerException(e);
        }
    }

    @Override
    public void doRollback() throws ResourceManagerException {
        Collection<XaQueueTxJournalEntry> logEntries = this.transactionJournal.getLogEntriesForTx(this.xid);
        for (XaQueueTxJournalEntry entry : logEntries) {
            if (!entry.isRemove()) continue;
            try {
                this.queueProvider.getQueue(entry.getQueueName()).putNow(entry.getValue());
            }
            catch (InterruptedException e) {
                throw new ResourceManagerException(e);
            }
        }
        this.transactionJournal.logRollback(this.xid);
    }

    @Override
    public void doPrepare() throws ResourceManagerException {
        this.transactionJournal.logPrepare(this.xid);
    }
}

