/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.bigpipe.transport.sub;

import com.baidu.bigpipe.driver.converter.sub.MessageBodyConverter;
import com.baidu.bigpipe.position.store.SubcribePositionStore;
import com.baidu.bigpipe.protocol.BigpipePacket;
import com.baidu.bigpipe.protocol.meta.NameService;
import com.baidu.bigpipe.protocol.meta.concept.InetAddress;
import com.baidu.bigpipe.protocol.meta.concept.TopicAddress;
import com.baidu.bigpipe.protocol.meta.exp.NameResolveException;
import com.baidu.bigpipe.protocol.pb.BigpipePBProtocol;
import com.baidu.bigpipe.transport.AbstractSessionSocketStream;
import com.baidu.bigpipe.transport.BigpipeSessionSupport;
import com.baidu.bigpipe.transport.NHeadTransportStrategy;
import com.baidu.bigpipe.transport.Receiver;
import com.baidu.bigpipe.transport.SessionSocketStream;
import com.baidu.bigpipe.transport.TransportStrategy;
import com.baidu.bigpipe.transport.conf.BigPipeConf;
import com.baidu.bigpipe.transport.conf.SocketConf;
import com.baidu.bigpipe.transport.sub.AsynchronousSubscriber;
import com.baidu.bigpipe.transport.sub.BigpipeMessageListener;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsynchronousSubscriberBioImpl
extends BigpipeSessionSupport
implements AsynchronousSubscriber {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsynchronousSubscriberBioImpl.class);
    private volatile Socket socket;
    private TransportStrategy transStrategy = new NHeadTransportStrategy();
    private BigpipeMessageListener messageListener;
    private MessageBodyConverter bodyConverter;
    private SubcribePositionStore positionStore;
    private BigPipeConf bigPipeConf;
    private volatile long startPoint;

    public Receiver getReciever() {
        return this.reciever;
    }

    public void setReciever(Receiver reciever) {
        this.reciever = reciever;
    }

    public TransportStrategy getTransStrategy() {
        return this.transStrategy;
    }

    public void setTransStrategy(TransportStrategy transStrategy) {
        this.transStrategy = transStrategy;
    }

    public BigpipeMessageListener getMessageListener() {
        return this.messageListener;
    }

    public void setMessageListener(BigpipeMessageListener messageListener) {
        this.messageListener = messageListener;
    }

    public MessageBodyConverter getBodyConverter() {
        return this.bodyConverter;
    }

    public void setBodyConverter(MessageBodyConverter bodyConverter) {
        this.bodyConverter = bodyConverter;
    }

    public SubcribePositionStore getPositionStore() {
        return this.positionStore;
    }

    public void setPositionStore(SubcribePositionStore positionStore) {
        this.positionStore = positionStore;
    }

    @Override
    public void startSubscribe(BigpipeMessageListener listener, BigPipeConf conf) {
        this.messageListener = listener;
        this.init(conf);
    }

    @Override
    public void shutDown() {
        this.lifeController.setShutDown(true);
        try {
            this.safeCloseTcpConnect();
            this.lifeController.getShutDownWait().await(this.bigPipeConf.getShutDownTimeout(), TimeUnit.MINUTES);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Override
    protected void continueConfig(BigPipeConf conf) {
        this.bigPipeConf = conf;
        try {
            this.ensureStartPoint(this.initStartPoint(conf));
        }
        catch (NameResolveException e) {
            throw new RuntimeException(e);
        }
        catch (KeeperException ke) {
            throw new RuntimeException(ke);
        }
    }

    private void threadCoreFunction() {
        this.lifeController.getThreadRunning().countDown();
        while (true) {
            if (this.lifeController.isShutDown()) {
                this.lifeController.getShutDownWait().countDown();
                break;
            }
            if (this.socket == null) {
                this.buildConnect(false, this.bigPipeConf);
                if (this.lifeController.isShutDown()) {
                    this.lifeController.getShutDownWait().countDown();
                    break;
                }
            }
            this.doSubscribeMessage();
        }
    }

    private void doSubscribeMessage() {
        try {
            ByteBuffer buffer = this.reciever.blockRecieve(this.socket);
            if (buffer == null) {
                LOGGER.info("read data from bigpipe failed, resubscribe.");
                this.safeCloseTcpConnect();
                return;
            }
            long[] ptHolder = new long[]{0L};
            BigpipePBProtocol.MessageCommand[] msgHolder = new BigpipePBProtocol.MessageCommand[]{null};
            List<Object> mList = this.extract(buffer, ptHolder, msgHolder);
            if (msgHolder[0] == null) {
                this.safeCloseTcpConnect();
                return;
            }
            if (mList.size() > 0) {
                this.messageListener.handle(mList, ptHolder[0]);
                this.startPoint = ptHolder[0] + 1L;
            } else {
                LOGGER.info("recieve null message.");
            }
            this.continueNextMessage(msgHolder[0]);
        }
        catch (SocketTimeoutException e) {
            LOGGER.warn("read timeout from bigpipe failed, resubscribe. It may be no new message arrives for a period time.");
            this.safeCloseTcpConnect();
        }
        catch (IOException e) {
            LOGGER.error("read from bigpipe failed,unkown io error, resubscribe.", (Throwable)e);
            this.safeCloseTcpConnect();
        }
        catch (RuntimeException e) {
            LOGGER.error("read from bigpipe failed,unkown error, resubscribe.", (Throwable)e);
            this.safeCloseTcpConnect();
        }
        catch (Exception e) {
            LOGGER.error("read from bigpipe failed,unkown error, resubscribe.", (Throwable)e);
            this.safeCloseTcpConnect();
        }
    }

    private void continueNextMessage(BigpipePBProtocol.MessageCommand messageComand) {
        BigpipePBProtocol.BigpipeCommand cmd = this.buildResponseMessage(messageComand);
        ByteBuffer buff = this.transStrategy.buildSimpleCommand(cmd.toByteArray());
        try {
            this.sendByteBufferData2Socket(this.socket, buff);
        }
        catch (IOException e) {
            LOGGER.error("write to bigpipe failed,unkown error, resubscribe.", (Throwable)e);
            this.safeCloseTcpConnect();
        }
    }

    private BigpipePBProtocol.BigpipeCommand buildResponseMessage(BigpipePBProtocol.MessageCommand messageComand) {
        BigpipePBProtocol.BigpipeCommand.Builder cmd = BigpipePBProtocol.BigpipeCommand.newBuilder();
        cmd.setType(BigpipePBProtocol.BigpipeCommand.CommandType.BMQ_ACK);
        BigpipePBProtocol.AckCommand.Builder ackCmd = cmd.getAckBuilder();
        ackCmd.setDestination(messageComand.getDestination());
        ackCmd.setAckType(1);
        ackCmd.setTopicMessageId(messageComand.getTopicMessageId());
        ackCmd.setReceiptId(messageComand.getReceiptId());
        BigpipePBProtocol.BigpipeCommand command = cmd.build();
        return command;
    }

    private List<Object> extract(ByteBuffer buf, long[] pointHolder, BigpipePBProtocol.MessageCommand[] msgHolder) {
        LinkedList<Object> list = new LinkedList<Object>();
        buf.order(ByteOrder.LITTLE_ENDIAN);
        byte[] cmdbytes = new byte[buf.getInt()];
        buf.get(cmdbytes);
        BigpipePBProtocol.BigpipeCommand cmd = null;
        try {
            cmd = BigpipePBProtocol.BigpipeCommand.parseFrom(cmdbytes);
        }
        catch (InvalidProtocolBufferException e) {
            LOGGER.error("BigpipeCommand.parseFrom error.", (Throwable)e);
            return list;
        }
        msgHolder[0] = cmd.getMessage();
        if (cmd.getType() != BigpipePBProtocol.BigpipeCommand.CommandType.BMQ_MESSAGE) {
            LOGGER.error(cmd.getError().toString());
            return list;
        }
        pointHolder[0] = cmd.getMessage().getTopicMessageId();
        int size = buf.getInt();
        LOGGER.info("body size:" + size);
        int count = 0;
        while (buf.remaining() > 4) {
            int bodySize = buf.getInt();
            if (bodySize > buf.remaining()) {
                bodySize = buf.remaining();
            }
            byte[] array = new byte[bodySize];
            buf.get(array);
            list.add(this.bodyConverter.bin2Object(array));
            ++count;
        }
        return list;
    }

    @Override
    public void start(BigPipeConf conf) {
        this.buildConnect(true, conf);
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                AsynchronousSubscriberBioImpl.this.threadCoreFunction();
            }
        });
        String threadName = "bigpipe-client-thread-";
        if (conf.getThreadName() != null) {
            threadName = threadName + conf.getThreadName() + "-";
        }
        t.setName(threadName + t.getId());
        t.start();
        try {
            this.lifeController.getThreadRunning().await();
        }
        catch (InterruptedException e) {
            LOGGER.error("start thread error.", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private long convertPositionFromFixedStartPoint(long point) {
        if (point == -1L) {
            return Long.MAX_VALUE;
        }
        return point;
    }

    @Override
    protected void waitingForConnect(int cnt) {
        LOGGER.warn("reconnect bigpipe failed.{} times", (Object)cnt);
        long sleep = 60000L;
        if (cnt < 5) {
            sleep = 500L;
        }
        if (cnt > 5) {
            LOGGER.warn("WARNING:failed to reconnect bigpipe more than 60s.");
        }
        try {
            Thread.sleep(sleep);
        }
        catch (InterruptedException e) {
            // empty catch block
        }
    }

    private void startSubcribe(Socket socket, Receiver reciever) throws IOException {
        BigpipePBProtocol.BigpipeCommand.Builder cmd = BigpipePBProtocol.BigpipeCommand.newBuilder();
        cmd.setType(BigpipePBProtocol.BigpipeCommand.CommandType.BMQ_SUBSCRIBE);
        BigpipePBProtocol.SubscribeCommand.Builder subCmd = cmd.getSubscribeBuilder();
        subCmd.setDestination(this.pipeRuntime.getTopicName());
        subCmd.setStartPoint(this.startPoint);
        LOGGER.info("subscribe start point:" + this.startPoint);
        subCmd.setReceiptId(System.currentTimeMillis() + "-" + Math.random());
        BigpipePBProtocol.BigpipeCommand command = cmd.build();
        ByteBuffer buf = this.transStrategy.buildSimpleCommand(command.toByteArray());
        this.sendByteBufferData2Socket(socket, buf);
        ByteBuffer read = reciever.blockRecieve(socket);
        if (read == null) {
            throw new RuntimeException("connect session failed");
        }
        BigpipePacket sub = AbstractSessionSocketStream.parseCommand(read);
        if (sub.getCommand().getType() != BigpipePBProtocol.BigpipeCommand.CommandType.BMQ_RECEIPT) {
            throw new RuntimeException(sub.getCommand().getError().toString());
        }
    }

    private void sendByteBufferData2Socket(Socket socket, ByteBuffer buf) throws IOException {
        socket.getOutputStream().write(buf.array(), 0, buf.limit());
    }

    private long initStartPoint(BigPipeConf conf) {
        Long pt;
        if (this.positionStore != null && (pt = this.positionStore.loadPosition()) != null) {
            Long l = pt;
            Long l2 = pt = Long.valueOf(pt + 1L);
            return pt;
        }
        return conf.getDefStartPoint();
    }

    private void ensureStartPoint(long point) throws NameResolveException, KeeperException {
        if (point == -2L) {
            String pipeletName;
            NameService ns = this.pipeRuntime.getNs();
            TopicAddress address = ns.lookupForSub(pipeletName = this.getPipeletOrQueueName(), 0L);
            if (address == null) {
                throw new NameResolveException(pipeletName, Long.MAX_VALUE, "sub stripe not found");
            }
            this.startPoint = address.getStripe().getBeginPos();
            return;
        }
        if (point == -1L) {
            this.startPoint = -1L;
            return;
        }
        this.startPoint = point;
    }

    @Override
    protected SessionSocketStream openStream(InetAddress addr, SocketConf conf) throws IOException {
        this.socket = new Socket();
        this.socket.connect(addr.getAddress(), conf.getConectTimeout());
        this.socket.setSoTimeout(conf.getIoTimeout());
        return new AbstractSessionSocketStream(){

            @Override
            protected Socket buildSocketIfNotExist() {
                return AsynchronousSubscriberBioImpl.this.socket;
            }

            @Override
            protected TransportStrategy getTransStrategy() {
                return AsynchronousSubscriberBioImpl.this.transStrategy;
            }

            @Override
            protected Receiver getReceiver() {
                return AsynchronousSubscriberBioImpl.this.reciever;
            }

            @Override
            protected int getRole() {
                return 5;
            }

            @Override
            protected void afterCreateSession() throws IOException {
                AsynchronousSubscriberBioImpl.this.startSubcribe(AsynchronousSubscriberBioImpl.this.socket, AsynchronousSubscriberBioImpl.this.reciever);
            }
        };
    }

    @Override
    protected TopicAddress lookupAddr(NameService ns, String pipeletName) throws NameResolveException, KeeperException {
        return ns.lookupForSub(pipeletName, this.convertPositionFromFixedStartPoint(this.startPoint));
    }

    @Override
    protected void handleFastFailed(boolean needBlocking) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void safeCloseTcpConnect() {
        if (this.socket == null) {
            return;
        }
        try {
            this.socket.close();
        }
        catch (IOException e) {
            LOGGER.info("safeCloseTcpConnect", (Throwable)e);
        }
        finally {
            this.socket = null;
        }
    }
}

