/*
 * Decompiled with CFR 0.152.
 */
package com.swiftmq.amqp.v100.client;

import com.swiftmq.amqp.v100.client.AMQPException;
import com.swiftmq.amqp.v100.client.DeliveryMemory;
import com.swiftmq.amqp.v100.client.Link;
import com.swiftmq.amqp.v100.client.LinkClosedException;
import com.swiftmq.amqp.v100.client.MessageAvailabilityListener;
import com.swiftmq.amqp.v100.client.Session;
import com.swiftmq.amqp.v100.client.po.POFillCache;
import com.swiftmq.amqp.v100.client.po.POSendDisposition;
import com.swiftmq.amqp.v100.generated.messaging.delivery_state.DeliveryStateIF;
import com.swiftmq.amqp.v100.generated.transactions.coordination.TxnIdIF;
import com.swiftmq.amqp.v100.generated.transport.performatives.TransferFrame;
import com.swiftmq.amqp.v100.messaging.AMQPMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Consumer
extends Link {
    private static final int DEFAULT_LINKCREDIT = 500;
    String source;
    List cache = new ArrayList();
    Lock cacheLock = new ReentrantLock();
    Condition cacheEmpty = null;
    volatile int linkCredit = 0;
    int currentLinkCredit = 0;
    AtomicLong deliveryCount = null;
    boolean acquireMode = false;
    volatile TxnIdIF currentTx = null;
    boolean firstFillCache = true;
    TransferFrame currentMessage = null;
    MessageAvailabilityListener messageAvailabilityListener = null;

    protected Consumer(Session mySession, String source, String name, int linkCredit, int qoS, DeliveryMemory deliveryMemory) {
        super(mySession, name, qoS, deliveryMemory);
        this.source = source;
        this.linkCredit = linkCredit;
        this.cacheEmpty = this.cacheLock.newCondition();
        this.fillCache(-1L);
    }

    protected Consumer(Session mySession, String source, String name, int qoS, DeliveryMemory deliveryMemory) {
        super(mySession, name, qoS, deliveryMemory);
        this.source = source;
        this.linkCredit = 0;
        this.acquireMode = true;
        this.cacheEmpty = this.cacheLock.newCondition();
    }

    public String getSource() {
        return this.source;
    }

    public int getLinkCredit() {
        return this.linkCredit;
    }

    public void setLinkCredit(int linkCredit) {
        this.linkCredit = linkCredit;
    }

    protected TransferFrame getCurrentMessage() {
        return this.currentMessage;
    }

    protected void setCurrentMessage(TransferFrame currentMessage) {
        this.currentMessage = currentMessage;
    }

    @Override
    protected void setDeliveryCount(long deliveryCount) {
        this.deliveryCount = new AtomicLong(deliveryCount);
    }

    protected void fillCache(long lastDeliveryId) {
        try {
            this.cacheLock.lock();
            if (this.linkCredit == 0) {
                this.linkCredit = 500;
            }
            this.currentLinkCredit = this.linkCredit;
            this.mySession.dispatch(new POFillCache(this, this.linkCredit, lastDeliveryId, this.currentTx));
            this.firstFillCache = false;
        }
        finally {
            this.cacheLock.unlock();
        }
    }

    protected void addToCache(AMQPMessage message) {
        try {
            this.cacheLock.lock();
            this.cache.add(message);
            if (this.cache.size() == 1) {
                if (this.messageAvailabilityListener != null) {
                    this.messageAvailabilityListener.messageAvailable(this);
                    this.messageAvailabilityListener = null;
                }
                this.cacheEmpty.signal();
            }
        }
        finally {
            this.cacheLock.unlock();
        }
    }

    public void sendDisposition(AMQPMessage message, DeliveryStateIF deliveryStateIF) {
        this.mySession.dispatch(new POSendDisposition(this, message.getDeliveryId(), message.getDeliveryTag(), deliveryStateIF));
    }

    public void acquire(int linkCredit, TxnIdIF txnid) throws LinkClosedException {
        this.verifyState();
        this.linkCredit = linkCredit;
        this.currentTx = txnid;
        this.fillCache(this.firstFillCache ? -1L : this.deliveryCount.get());
    }

    public AMQPMessage receive(long timeout) {
        return this.receive(timeout, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private AMQPMessage receive(long timeout, MessageAvailabilityListener messageAvailabilityListener) {
        AMQPMessage msg = null;
        if (this.closed) {
            return null;
        }
        try {
            this.cacheLock.lock();
            if (this.cache.size() == 0) {
                if (timeout > 0L) {
                    try {
                        this.cacheEmpty.await(timeout, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException interruptedException) {}
                } else if (timeout == 0L) {
                    this.cacheEmpty.awaitUninterruptibly();
                } else if (timeout == -1L) {
                    this.messageAvailabilityListener = messageAvailabilityListener;
                }
            }
            if (this.cache.size() == 0) {
                AMQPMessage aMQPMessage = null;
                return aMQPMessage;
            }
            msg = (AMQPMessage)this.cache.remove(0);
            msg.setConsumer(this);
            long dc = this.deliveryCount.incrementAndGet();
            --this.currentLinkCredit;
            if (!this.acquireMode && this.currentLinkCredit == 0) {
                this.fillCache(dc);
            }
        }
        finally {
            this.cacheLock.unlock();
        }
        return msg;
    }

    public AMQPMessage receive() {
        return this.receive(0L);
    }

    public AMQPMessage receiveNoWait() {
        return this.receive(-1L);
    }

    public AMQPMessage receiveNoWait(MessageAvailabilityListener messageAvailabilityListener) {
        return this.receive(-1L, messageAvailabilityListener);
    }

    @Override
    public void close() throws AMQPException {
        try {
            this.cacheLock.lock();
            this.cacheEmpty.signal();
        }
        finally {
            this.cacheLock.unlock();
        }
        super.close();
    }

    @Override
    protected void cancel() {
        super.cancel();
        try {
            this.cacheLock.lock();
            this.cacheEmpty.signal();
        }
        finally {
            this.cacheLock.unlock();
        }
    }
}

