/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.bigpipe.transport.queue;

import com.baidu.bigpipe.driver.converter.sub.MessageBodyConverter;
import com.baidu.bigpipe.protocol.McpackCommand;
import com.baidu.bigpipe.protocol.QueueAck;
import com.baidu.bigpipe.protocol.QueueMessage;
import com.baidu.bigpipe.protocol.QueueRequest;
import com.baidu.bigpipe.protocol.meta.NameService;
import com.baidu.bigpipe.protocol.meta.concept.InetAddress;
import com.baidu.bigpipe.protocol.meta.concept.QueueAddress;
import com.baidu.bigpipe.protocol.meta.exp.NameResolveException;
import com.baidu.bigpipe.protocol.meta.exp.QueueLocateException;
import com.baidu.bigpipe.protocol.pb.BigpipePBProtocol;
import com.baidu.bigpipe.transport.AbstractSessionSocketStream;
import com.baidu.bigpipe.transport.BigpipeSessionSupport;
import com.baidu.bigpipe.transport.NHeadTransportStrategy;
import com.baidu.bigpipe.transport.Receiver;
import com.baidu.bigpipe.transport.SessionSocketStream;
import com.baidu.bigpipe.transport.TransportStrategy;
import com.baidu.bigpipe.transport.conf.BigPipeConf;
import com.baidu.bigpipe.transport.conf.SocketConf;
import com.baidu.bigpipe.transport.queue.AsynchronousQueueSubscriber;
import com.baidu.bigpipe.transport.sub.PipeletIdAwareBigpipeMessageListener;
import com.baidu.mcpack.McpackException;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsynchronousQueueSubscriberBioImpl
extends BigpipeSessionSupport
implements AsynchronousQueueSubscriber {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsynchronousQueueSubscriberBioImpl.class);
    private volatile Socket socket;
    private TransportStrategy transStrategy = new NHeadTransportStrategy();
    private PipeletIdAwareBigpipeMessageListener queueListener;
    private MessageBodyConverter bodyConverter;
    private BigPipeConf bigPipeConf;
    private volatile long startPoint;

    public Receiver getReciever() {
        return this.reciever;
    }

    public void setReciever(Receiver reciever) {
        this.reciever = reciever;
    }

    public TransportStrategy getTransStrategy() {
        return this.transStrategy;
    }

    public void setTransStrategy(TransportStrategy transStrategy) {
        this.transStrategy = transStrategy;
    }

    public PipeletIdAwareBigpipeMessageListener getMessageListener() {
        return this.queueListener;
    }

    public void setQueueListener(PipeletIdAwareBigpipeMessageListener messageListener) {
        this.queueListener = messageListener;
    }

    public MessageBodyConverter getBodyConverter() {
        return this.bodyConverter;
    }

    public void setBodyConverter(MessageBodyConverter bodyConverter) {
        this.bodyConverter = bodyConverter;
    }

    public BigPipeConf getBigPipeConf() {
        return this.bigPipeConf;
    }

    public void setBigPipeConf(BigPipeConf bigPipeConf) {
        this.bigPipeConf = bigPipeConf;
    }

    @Override
    public void startSubscribe() {
        this.setType(BigpipeSessionSupport.PIPE_TYPE.QUEUE.ordinal());
        this.init(this.bigPipeConf);
    }

    @Override
    public void shutDown() {
        this.lifeController.setShutDown(true);
        try {
            this.safeCloseTcpConnect();
            this.lifeController.getShutDownWait().await(this.bigPipeConf.getShutDownTimeout(), TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            LOGGER.warn("shutdown error, ignore.", (Throwable)e);
        }
    }

    @Override
    protected void continueConfig(BigPipeConf conf) {
        this.bigPipeConf = conf;
    }

    private void threadCoreFunction() {
        this.lifeController.getThreadRunning().countDown();
        while (true) {
            if (this.lifeController.isShutDown()) {
                this.lifeController.getShutDownWait().countDown();
                break;
            }
            if (this.socket == null) {
                this.buildConnect(false, this.bigPipeConf);
                if (this.lifeController.isShutDown()) {
                    this.lifeController.getShutDownWait().countDown();
                    break;
                }
            }
            this.doSubscribeMessage();
        }
    }

    private void doSubscribeMessage() {
        try {
            ByteBuffer buffer = this.reciever.blockRecieve(this.socket);
            if (buffer == null) {
                LOGGER.info("read data from bigpipe failed [Is token right from email sent from system], resubscribe.");
                this.safeCloseTcpConnect();
                return;
            }
            QueueMessage message = McpackCommand.bufferToProtocol(buffer, QueueMessage.class);
            ByteBuffer body = ByteBuffer.wrap(message.getMsg_body());
            List<Object> mList = this.extract(body);
            this.queueListener.handle(mList, message.getPipelet_id(), message.getPipelet_msg_id());
            this.sendAcknowledgement(message, false);
        }
        catch (SocketTimeoutException e) {
            LOGGER.warn("read timeout from bigpipe failed, resubscribe. It may be no new message arrives for a period time.");
            this.safeCloseTcpConnect();
        }
        catch (IOException e) {
            LOGGER.error("read from bigpipe failed,unkown io error, resubscribe.", (Throwable)e);
            this.safeCloseTcpConnect();
        }
        catch (RuntimeException e) {
            LOGGER.error("read from bigpipe failed,unkown error, resubscribe.", (Throwable)e);
            this.safeCloseTcpConnect();
        }
        catch (Exception e) {
            LOGGER.error("read from bigpipe failed,unkown error, resubscribe.", (Throwable)e);
            this.safeCloseTcpConnect();
        }
    }

    public McpackCommand sendAcknowledgement(QueueMessage message, boolean isFake) throws IOException {
        QueueAck ack = new QueueAck();
        if (isFake) {
            ack.setCmd_no(3);
        } else {
            ack.setCmd_no(2);
        }
        ack.setQueue_name(this.bigPipeConf.getQueue());
        ack.setPipe_name(message.getPipe_name());
        ack.setPipelet_id(message.getPipelet_id());
        ack.setPipelet_msg_id(message.getPipelet_msg_id());
        ack.setSeq_id(message.getSeq_id());
        try {
            ByteBuffer buffer = McpackCommand.protocolToBuffer(ack);
            this.sendByteBufferData2Socket(this.socket, buffer);
        }
        catch (IOException e) {
            throw e;
        }
        catch (McpackException e) {
            IOException ex = new IOException("McpackException happend during request" + e.getMessage());
            ex.initCause(e);
            throw ex;
        }
        return ack;
    }

    private void continueNextMessage(BigpipePBProtocol.MessageCommand messageComand) {
        BigpipePBProtocol.BigpipeCommand cmd = this.buildResponseMessage(messageComand);
        ByteBuffer buff = this.transStrategy.buildSimpleCommand(cmd.toByteArray());
        try {
            this.sendByteBufferData2Socket(this.socket, buff);
        }
        catch (IOException e) {
            LOGGER.error("write to bigpipe failed,unkown error, resubscribe.", (Throwable)e);
            this.safeCloseTcpConnect();
        }
    }

    private BigpipePBProtocol.BigpipeCommand buildResponseMessage(BigpipePBProtocol.MessageCommand messageComand) {
        BigpipePBProtocol.BigpipeCommand.Builder cmd = BigpipePBProtocol.BigpipeCommand.newBuilder();
        cmd.setType(BigpipePBProtocol.BigpipeCommand.CommandType.BMQ_ACK);
        BigpipePBProtocol.AckCommand.Builder ackCmd = cmd.getAckBuilder();
        ackCmd.setDestination(messageComand.getDestination());
        ackCmd.setAckType(1);
        ackCmd.setTopicMessageId(messageComand.getTopicMessageId());
        ackCmd.setReceiptId(messageComand.getReceiptId());
        BigpipePBProtocol.BigpipeCommand command = cmd.build();
        return command;
    }

    private List<Object> extract(ByteBuffer buf) {
        LinkedList<Object> list = new LinkedList<Object>();
        buf.order(ByteOrder.LITTLE_ENDIAN);
        int size = buf.getInt();
        LOGGER.info("body size:" + size);
        int count = 0;
        while (buf.remaining() > 4) {
            int bodySize = buf.getInt();
            if (bodySize > buf.remaining()) {
                bodySize = buf.remaining();
            }
            byte[] array = new byte[bodySize];
            buf.get(array);
            list.add(this.bodyConverter.bin2Object(array));
            ++count;
        }
        return list;
    }

    @Override
    public void start(BigPipeConf conf) {
        this.buildConnect(true, conf);
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                AsynchronousQueueSubscriberBioImpl.this.threadCoreFunction();
            }
        });
        String threadName = "bigpipe-client-thread-";
        if (conf.getThreadName() != null) {
            threadName = threadName + conf.getThreadName() + "-";
        }
        t.setName(threadName + t.getId());
        t.start();
        try {
            this.lifeController.getThreadRunning().await();
        }
        catch (InterruptedException e) {
            LOGGER.error("start thread error.", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private long convertPositionFromFixedStartPoint(long point) {
        if (point == -1L) {
            return Long.MAX_VALUE;
        }
        return point;
    }

    @Override
    protected void waitingForConnect(int cnt) {
        LOGGER.warn("reconnect bigpipe failed.{} times", (Object)cnt);
        long sleep = 60000L;
        if (cnt < 5) {
            sleep = 500L;
        }
        if (cnt > 5) {
            LOGGER.warn("WARNING:failed to reconnect bigpipe more than 60s.");
        }
        try {
            Thread.sleep(sleep);
        }
        catch (InterruptedException e) {
            // empty catch block
        }
    }

    private void startSubcribe(Socket socket, Receiver reciever) throws IOException {
        QueueRequest request = new QueueRequest();
        request.setCmd_no(1);
        request.setQueue_name(this.bigPipeConf.getQueue());
        request.setToken(this.bigPipeConf.getToken4Queue());
        request.setWindow_size(this.bigPipeConf.getWindowSize());
        request.setApi_version("driver4j");
        try {
            ByteBuffer buf = McpackCommand.protocolToBuffer(request);
            socket.getOutputStream().write(buf.array(), 0, buf.limit());
        }
        catch (IOException e) {
            throw e;
        }
        catch (McpackException e) {
            IOException ex = new IOException("McpackException happend during request" + e.getMessage());
            ex.initCause(e);
            throw ex;
        }
    }

    private void sendByteBufferData2Socket(Socket socket, ByteBuffer buf) throws IOException {
        socket.getOutputStream().write(buf.array(), 0, buf.limit());
    }

    @Override
    protected SessionSocketStream openStream(InetAddress addr, SocketConf conf) throws IOException {
        this.socket = new Socket();
        this.socket.connect(addr.getAddress(), conf.getConectTimeout());
        this.socket.setSoTimeout(conf.getIoTimeout());
        return new AbstractSessionSocketStream(){

            @Override
            protected Socket buildSocketIfNotExist() {
                return AsynchronousQueueSubscriberBioImpl.this.socket;
            }

            @Override
            protected TransportStrategy getTransStrategy() {
                return AsynchronousQueueSubscriberBioImpl.this.transStrategy;
            }

            @Override
            protected Receiver getReceiver() {
                return AsynchronousQueueSubscriberBioImpl.this.reciever;
            }

            @Override
            protected int getRole() {
                return 5;
            }

            @Override
            protected void afterCreateSession() throws IOException {
                AsynchronousQueueSubscriberBioImpl.this.startSubcribe(AsynchronousQueueSubscriberBioImpl.this.socket, AsynchronousQueueSubscriberBioImpl.this.reciever);
            }
        };
    }

    @Override
    protected InetAddress lookupAddr(NameService ns, String queueName) throws NameResolveException, KeeperException, QueueLocateException {
        QueueAddress queueAddress = ns.lookupQueue(queueName);
        return queueAddress;
    }

    @Override
    protected void handleFastFailed(boolean needBlocking) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void safeCloseTcpConnect() {
        if (this.socket == null) {
            return;
        }
        try {
            this.socket.close();
        }
        catch (IOException e) {
            LOGGER.info("safeCloseTcpConnect", (Throwable)e);
        }
        finally {
            this.socket = null;
        }
    }
}

