/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.servicebus.amqp;

import com.microsoft.azure.servicebus.amqp.AmqpException;
import com.microsoft.azure.servicebus.amqp.AmqpUtil;
import com.microsoft.azure.servicebus.amqp.DispatchHandler;
import com.microsoft.azure.servicebus.amqp.IAmqpReceiver;
import com.microsoft.azure.servicebus.amqp.IAmqpSender;
import com.microsoft.azure.servicebus.amqp.IIOObject;
import com.microsoft.azure.servicebus.amqp.IOperationResult;
import com.microsoft.azure.servicebus.amqp.ReactorDispatcher;
import com.microsoft.azure.servicebus.amqp.ReceiveLinkHandler;
import com.microsoft.azure.servicebus.amqp.SendLinkHandler;
import java.io.IOException;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;

public class RequestResponseChannel
implements IIOObject {
    private final Sender sendLink;
    private final Receiver receiveLink;
    private final String replyTo;
    private final HashMap<Object, IOperationResult<Message, Exception>> inflightRequests;
    private final AtomicLong requestId;
    private final AtomicInteger openRefCount;
    private final AtomicInteger closeRefCount;
    private IOperationResult<Void, Exception> onOpen;
    private IOperationResult<Void, Exception> onClose;
    private IOperationResult<Void, Exception> onGraceFullClose;

    public RequestResponseChannel(String linkName, String path, Session session) {
        this.replyTo = path.replace("$", "") + "-client-reply-to";
        this.openRefCount = new AtomicInteger(2);
        this.closeRefCount = new AtomicInteger(2);
        this.inflightRequests = new HashMap();
        this.requestId = new AtomicLong(0L);
        this.sendLink = session.sender(linkName + ":sender");
        Target target = new Target();
        target.setAddress(path);
        this.sendLink.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
        this.sendLink.setSource((org.apache.qpid.proton.amqp.transport.Source)new Source());
        this.sendLink.setSenderSettleMode(SenderSettleMode.SETTLED);
        BaseHandler.setHandler((Extendable)this.sendLink, (Handler)new SendLinkHandler(new RequestHandler()));
        this.receiveLink = session.receiver(linkName + ":receiver");
        Source source = new Source();
        source.setAddress(path);
        this.receiveLink.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        Target receiverTarget = new Target();
        receiverTarget.setAddress(this.replyTo);
        this.receiveLink.setTarget((org.apache.qpid.proton.amqp.transport.Target)receiverTarget);
        this.receiveLink.setSenderSettleMode(SenderSettleMode.SETTLED);
        this.receiveLink.setReceiverSettleMode(ReceiverSettleMode.SECOND);
        BaseHandler.setHandler((Extendable)this.receiveLink, (Handler)new ReceiveLinkHandler(new ResponseHandler()));
    }

    public void open(IOperationResult<Void, Exception> onOpen, IOperationResult<Void, Exception> onClose) {
        this.onOpen = onOpen;
        this.onClose = onClose;
        this.sendLink.open();
        this.receiveLink.open();
    }

    public void close(IOperationResult<Void, Exception> onGraceFullClose) {
        this.onGraceFullClose = onGraceFullClose;
        this.sendLink.close();
        this.receiveLink.close();
    }

    public Sender getSendLink() {
        return this.sendLink;
    }

    public Receiver getReceiveLink() {
        return this.receiveLink;
    }

    public void request(ReactorDispatcher dispatcher, final Message message, final IOperationResult<Message, Exception> onResponse) {
        if (message == null) {
            throw new IllegalArgumentException("message cannot be null");
        }
        if (message.getMessageId() != null) {
            throw new IllegalArgumentException("message.getMessageId() should be null");
        }
        if (message.getReplyTo() != null) {
            throw new IllegalArgumentException("message.getReplyTo() should be null");
        }
        message.setMessageId((Object)("request" + UnsignedLong.valueOf((long)this.requestId.incrementAndGet()).toString()));
        message.setReplyTo(this.replyTo);
        this.inflightRequests.put(message.getMessageId(), onResponse);
        try {
            dispatcher.invoke(new DispatchHandler(){

                @Override
                public void onEvent() {
                    Delivery delivery = RequestResponseChannel.this.sendLink.delivery(UUID.randomUUID().toString().replace("-", "").getBytes());
                    int payloadSize = AmqpUtil.getDataSerializedSize(message) + 512;
                    delivery.setContext((Object)onResponse);
                    byte[] bytes = new byte[payloadSize];
                    int encodedSize = message.encode(bytes, 0, payloadSize);
                    RequestResponseChannel.this.receiveLink.flow(1);
                    RequestResponseChannel.this.sendLink.send(bytes, 0, encodedSize);
                    RequestResponseChannel.this.sendLink.advance();
                }
            });
        }
        catch (IOException ioException) {
            onResponse.onError(ioException);
        }
    }

    private void onLinkOpenComplete(Exception exception) {
        if (this.openRefCount.decrementAndGet() <= 0 && this.onOpen != null) {
            if (exception == null && this.sendLink.getRemoteState() == EndpointState.ACTIVE && this.receiveLink.getRemoteState() == EndpointState.ACTIVE) {
                this.onOpen.onComplete(null);
            } else if (exception != null) {
                this.onOpen.onError(exception);
            } else {
                ErrorCondition error = this.sendLink.getRemoteCondition() != null && this.sendLink.getRemoteCondition().getCondition() != null ? this.sendLink.getRemoteCondition() : this.receiveLink.getRemoteCondition();
                this.onOpen.onError(new AmqpException(error));
            }
        }
    }

    private void onLinkCloseComplete(Exception exception) {
        if (this.closeRefCount.decrementAndGet() <= 0) {
            if (exception == null) {
                this.onClose.onComplete(null);
                if (this.onGraceFullClose != null) {
                    this.onGraceFullClose.onComplete(null);
                }
            } else {
                this.onClose.onError(exception);
                if (this.onGraceFullClose != null) {
                    this.onGraceFullClose.onError(exception);
                }
            }
        }
    }

    @Override
    public IIOObject.IOObjectState getState() {
        if (this.sendLink.getLocalState() == EndpointState.UNINITIALIZED || this.receiveLink.getLocalState() == EndpointState.UNINITIALIZED || this.sendLink.getRemoteState() == EndpointState.UNINITIALIZED || this.receiveLink.getRemoteState() == EndpointState.UNINITIALIZED) {
            return IIOObject.IOObjectState.OPENING;
        }
        if (this.sendLink.getRemoteState() == EndpointState.ACTIVE && this.receiveLink.getRemoteState() == EndpointState.ACTIVE && this.sendLink.getLocalState() == EndpointState.ACTIVE && this.receiveLink.getRemoteState() == EndpointState.ACTIVE) {
            return IIOObject.IOObjectState.OPENED;
        }
        if (this.sendLink.getRemoteState() == EndpointState.CLOSED && this.receiveLink.getRemoteState() == EndpointState.CLOSED) {
            return IIOObject.IOObjectState.CLOSED;
        }
        return IIOObject.IOObjectState.CLOSING;
    }

    private class ResponseHandler
    implements IAmqpReceiver {
        private ResponseHandler() {
        }

        @Override
        public void onReceiveComplete(Delivery delivery) {
            Message response = Proton.message();
            int msgSize = delivery.pending();
            byte[] buffer = new byte[msgSize];
            int read = RequestResponseChannel.this.receiveLink.recv(buffer, 0, msgSize);
            response.decode(buffer, 0, read);
            delivery.settle();
            IOperationResult responseCallback = (IOperationResult)RequestResponseChannel.this.inflightRequests.remove(response.getCorrelationId());
            if (responseCallback != null) {
                responseCallback.onComplete(response);
            }
        }

        @Override
        public void onOpenComplete(Exception completionException) {
            RequestResponseChannel.this.onLinkOpenComplete(completionException);
        }

        @Override
        public void onError(Exception exception) {
            for (IOperationResult responseCallback : RequestResponseChannel.this.inflightRequests.values()) {
                responseCallback.onError(exception);
            }
            RequestResponseChannel.this.inflightRequests.clear();
            if (RequestResponseChannel.this.onClose != null) {
                RequestResponseChannel.this.onLinkCloseComplete(exception);
            }
        }

        @Override
        public void onClose(ErrorCondition condition) {
            if (condition == null || condition.getCondition() == null) {
                RequestResponseChannel.this.onLinkCloseComplete(null);
            } else {
                this.onError(new AmqpException(condition));
            }
        }
    }

    private class RequestHandler
    implements IAmqpSender {
        private RequestHandler() {
        }

        @Override
        public void onFlow(int creditIssued) {
        }

        @Override
        public void onSendComplete(Delivery delivery) {
        }

        @Override
        public void onOpenComplete(Exception completionException) {
            RequestResponseChannel.this.onLinkOpenComplete(completionException);
        }

        @Override
        public void onError(Exception exception) {
            RequestResponseChannel.this.onLinkCloseComplete(exception);
        }

        @Override
        public void onClose(ErrorCondition condition) {
            if (condition == null || condition.getCondition() == null) {
                RequestResponseChannel.this.onLinkCloseComplete(null);
            } else {
                this.onError(new AmqpException(condition));
            }
        }
    }
}

