/*
 * Decompiled with CFR 0.152.
 */
package com.cloudhopper.mq.broker;

import com.cloudhopper.mq.broker.DistributedQueueConfiguration;
import com.cloudhopper.mq.broker.NoMoreRemoteBrokersException;
import com.cloudhopper.mq.broker.RemoteBrokerInfo;
import com.cloudhopper.mq.broker.RemoteQueueInfo;
import com.cloudhopper.mq.broker.RemoteQueueTransferListener;
import com.cloudhopper.mq.broker.TransferItem;
import com.cloudhopper.mq.broker.TransferResponseHandler;
import com.cloudhopper.mq.broker.protocol.MaxTransferAttemptsCountException;
import com.cloudhopper.mq.broker.protocol.MaxTransferCountException;
import com.cloudhopper.mq.broker.protocol.ProtocolFactory;
import com.cloudhopper.mq.broker.protocol.ProtocolParsingException;
import com.cloudhopper.mq.broker.protocol.ProtocolResponseUtil;
import com.cloudhopper.mq.broker.protocol.ProtocolResultCodeException;
import com.cloudhopper.mq.broker.protocol.TransferResponse;
import com.cloudhopper.mq.message.MQMessage;
import com.cloudhopper.mq.transcoder.TranscoderWrapped;
import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.AsyncHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.Response;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncRemoteQueueTransfer {
    private static final Logger logger = LoggerFactory.getLogger(AsyncRemoteQueueTransfer.class);
    private final AsyncHttpClient http;
    private final DistributedQueueConfiguration dqconfig;
    private final RemoteQueueTransferListener listener;
    private final RemoteQueueInfo remoteQueue;
    private final TransferItem transferItem;

    public AsyncRemoteQueueTransfer(AsyncHttpClient http, DistributedQueueConfiguration dqconfig, RemoteQueueTransferListener listener, RemoteQueueInfo remoteQueue, TransferItem transferItem) {
        this.http = http;
        this.dqconfig = dqconfig;
        this.listener = listener;
        this.remoteQueue = remoteQueue;
        this.transferItem = transferItem;
    }

    private synchronized String getNextRemoteBroker() throws NoMoreRemoteBrokersException {
        String remoteBroker = this.remoteQueue.getNextRemoteBroker();
        if (remoteBroker == null) {
            throw new NoMoreRemoteBrokersException("No more RemoteBrokers available for RemoteQueue [" + this.remoteQueue.getName() + "] since call to getNextRemoteBroker() returned null during transfer attempt");
        }
        return remoteBroker;
    }

    private RemoteBrokerInfo getBrokerInfo(String url) {
        return this.remoteQueue.getRemoteBrokers().get(url).getBroker();
    }

    public void transfer(final TransferResponseHandler handler) {
        byte transferCount = (byte)this.transferItem.getMessage().getTransferCount();
        if (transferCount + 1 > this.dqconfig.getMaxTransferCount()) {
            handler.onThrowable(new MaxTransferCountException("Max transfer count [" + transferCount + "] reached for item"));
            return;
        }
        String nextBroker = null;
        try {
            nextBroker = this.getNextRemoteBroker();
        }
        catch (NoMoreRemoteBrokersException e) {
            handler.onThrowable(e);
            return;
        }
        final String remoteBrokerUrl = new String(nextBroker);
        RemoteBrokerInfo brokerInfo = this.getBrokerInfo(nextBroker);
        if (this.transferItem.getMessage().getTransferAttemptsCount() + 1 > this.dqconfig.getMaxTransferAttemptsCount()) {
            handler.onThrowable(new MaxTransferAttemptsCountException("Max transfer attempts count [" + this.transferItem.getMessage().getTransferAttemptsCount() + "] exceeded on this processor for item"));
            return;
        }
        short transferAttemptsCount = this.transferItem.getMessage().incrementAndGetTransferAttemptsCount();
        try {
            logger.trace("[{}] Attempting transfer of item to RemoteBroker [{}] [attempt: {} transferCount: {}]", new Object[]{this.remoteQueue.getName(), remoteBrokerUrl, transferAttemptsCount, transferCount});
            AsyncCompletionHandler<TransferResponse> asyncHandler = new AsyncCompletionHandler<TransferResponse>(){

                public TransferResponse onCompleted(Response response) throws Exception {
                    try {
                        String body = response.getResponseBody();
                        TransferResponse transferResponse = ProtocolFactory.parseTransferResponse(body);
                        ProtocolResponseUtil.verifyResultCodeIsOK(transferResponse);
                        if (handler != null) {
                            handler.onComplete(transferResponse);
                        }
                        return transferResponse;
                    }
                    catch (Exception e) {
                        this.onThrowable(e);
                        return null;
                    }
                }

                public void onThrowable(Throwable t) {
                    boolean retryable = true;
                    if (t instanceof ProtocolResultCodeException) {
                        ProtocolResultCodeException e = (ProtocolResultCodeException)t;
                        if (e.getResultCode() == 1 || e.getResultCode() == 2) {
                            logger.warn("[{}] RemoteBroker [{}] returned an error indicating the queue is either gone or no longer has a consumer", (Object)AsyncRemoteQueueTransfer.this.remoteQueue.getName(), (Object)remoteBrokerUrl);
                            if (AsyncRemoteQueueTransfer.this.listener != null) {
                                AsyncRemoteQueueTransfer.this.listener.notifyQueueOnRemoteBrokerIsNoLongerAvailable(remoteBrokerUrl, AsyncRemoteQueueTransfer.this.remoteQueue.getName());
                            }
                        } else if (e.getResultCode() == 5) {
                            logger.warn("[{}] The group on RemoteBroker [{}] no longer matches ours", (Object)AsyncRemoteQueueTransfer.this.remoteQueue.getName(), (Object)remoteBrokerUrl);
                            if (AsyncRemoteQueueTransfer.this.listener != null) {
                                AsyncRemoteQueueTransfer.this.listener.notifyRemoteBrokerIsNoLongerAvailable(remoteBrokerUrl, e.getMessage());
                            }
                        } else {
                            logger.warn("[{}] Result code [{}] from remote broker [{}] was not OK, safest action is to notifyQueueOnRemoteBrokerIsNoLongerAvailable", new Object[]{AsyncRemoteQueueTransfer.this.remoteQueue.getName(), e.getResultCode(), remoteBrokerUrl});
                            if (AsyncRemoteQueueTransfer.this.listener != null) {
                                AsyncRemoteQueueTransfer.this.listener.notifyQueueOnRemoteBrokerIsNoLongerAvailable(remoteBrokerUrl, AsyncRemoteQueueTransfer.this.remoteQueue.getName());
                            }
                        }
                    } else if (t instanceof ProtocolParsingException) {
                        logger.warn("[" + AsyncRemoteQueueTransfer.this.remoteQueue.getName() + "] Unexpected parsing issue with protocol (despite an HTTP 200 status code), safest action is to notifyQueueOnRemoteBrokerIsNoLongerAvailable", t);
                        if (AsyncRemoteQueueTransfer.this.listener != null) {
                            AsyncRemoteQueueTransfer.this.listener.notifyQueueOnRemoteBrokerIsNoLongerAvailable(remoteBrokerUrl, AsyncRemoteQueueTransfer.this.remoteQueue.getName());
                        }
                    } else if (t instanceof IOException) {
                        logger.warn("[" + AsyncRemoteQueueTransfer.this.remoteQueue.getName() + "] Unexpected IO exception, going to notifyRemoteBrokerIsNoLongerAvailable", t);
                        if (AsyncRemoteQueueTransfer.this.listener != null) {
                            AsyncRemoteQueueTransfer.this.listener.notifyRemoteBrokerIsNoLongerAvailable(remoteBrokerUrl, t.getMessage());
                        }
                    } else {
                        logger.warn("[" + AsyncRemoteQueueTransfer.this.remoteQueue.getName() + "] Unexpected exception on transfer. Will not retry.", t);
                        if (handler != null) {
                            handler.onThrowable(t);
                        }
                        retryable = false;
                    }
                    if (retryable) {
                        logger.trace("[{}] Retrying request {}", (Object)AsyncRemoteQueueTransfer.this.remoteQueue.getName(), (Object)AsyncRemoteQueueTransfer.this.transferItem);
                        AsyncRemoteQueueTransfer.this.transfer(handler);
                    }
                }
            };
            String queueName = this.remoteQueue.getName();
            String type = null;
            byte[] data = null;
            logger.trace("RemoteBroker version {}", (Object)brokerInfo.getVersion());
            logger.trace("Transcoder is {}", (Object)this.transferItem.getLocalQueue().getTranscoder().getClass().getName());
            logger.trace("Item is {}", (Object)this.transferItem.getItem().getClass().getName());
            if (brokerInfo.getVersion() == 1 && this.transferItem.getLocalQueue().getTranscoder() instanceof TranscoderWrapped && this.transferItem.getItem() instanceof MQMessage) {
                logger.trace("RemoteBroker[{}] is version 1", (Object)brokerInfo);
                logger.trace("Queue[{}] uses TranscoderWrapped and item is MQMessage", (Object)this.transferItem.getLocalQueue().getName());
                TranscoderWrapped tw = (TranscoderWrapped)this.transferItem.getLocalQueue().getTranscoder();
                MQMessage mqItem = (MQMessage)this.transferItem.getItem();
                type = mqItem.getBody().getClass().getCanonicalName();
                data = tw.getBaseTranscoder().encode(mqItem.getBody());
            } else {
                type = this.transferItem.getItemType().getCanonicalName();
                data = this.transferItem.getLocalQueue().getTranscoder().encode(this.transferItem.getItem());
            }
            String cmdurl = remoteBrokerUrl + "?cmd=transfer&queue=" + queueName + "&itemType=" + type;
            if (this.dqconfig.getGroupName() != null) {
                cmdurl = cmdurl + "&group=" + this.dqconfig.getGroupName();
            }
            AsyncHttpClient.BoundRequestBuilder builder = this.http.preparePost(cmdurl);
            builder.setBody(data);
            builder.execute((AsyncHandler)asyncHandler);
        }
        catch (Exception e) {
            handler.onThrowable(e);
        }
    }
}

