/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.bigpipe.transport.pub;

import com.baidu.bigpipe.protocol.meta.concept.InetAddress;
import com.baidu.bigpipe.transport.AbstractSessionSocketStream;
import com.baidu.bigpipe.transport.PipeRuntime;
import com.baidu.bigpipe.transport.PipeletInfo;
import com.baidu.bigpipe.transport.Receiver;
import com.baidu.bigpipe.transport.SessionSocketStream;
import com.baidu.bigpipe.transport.TransportStrategy;
import com.baidu.bigpipe.transport.conf.SocketConf;
import com.baidu.bigpipe.transport.pub.AbstractNioPublisher;
import com.baidu.bigpipe.transport.pub.AbstractNioSession;
import com.baidu.bigpipe.transport.pub.AsynchronousPublisher;
import com.baidu.bigpipe.transport.pub.context.ReadContext;
import com.baidu.bigpipe.transport.pub.context.ReadState;
import com.baidu.bigpipe.transport.pub.context.WriteState;
import com.baidu.bigpipe.transport.pub.context.WriteTask;
import java.io.IOException;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsynchronousPublisherImpl
extends AbstractNioPublisher
implements AsynchronousPublisher {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsynchronousPublisherImpl.class);

    @Override
    protected void handleSelectorException(SelectionKey key) {
        this.sessionRuntime.hasError = true;
        super.safeCloseTcpConnect();
        this.pubStrategy.fastFailed(this.pipeRuntime.getSessionIdProvider());
        this.sessionRuntime.needOpenTcp = true;
        this.sessionRuntime.hasError = false;
    }

    @Override
    protected WriteState write(SelectionKey k) {
        AbstractNioSession.AttachHolder holder = (AbstractNioSession.AttachHolder)k.attachment();
        if (holder == null || holder.writeTask == null) {
            return WriteState.NoTask;
        }
        SocketChannel chnl = (SocketChannel)k.channel();
        try {
            chnl.write(holder.writeTask.getBuf());
            if (holder.writeTask.isReConnectAndNoSend()) {
                holder.writeTask.setReConnectAndNoSend(false);
            }
            if (holder.writeTask.getBuf().remaining() == 0) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("write finish for logid {},session id is {} ", (Object)holder.writeTask.getLogId(), (Object)holder.writeTask.getSessionMessageId());
                }
                holder.writeTask = null;
                return WriteState.Finish;
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("writing for logid {},session id is {} ", (Object)holder.writeTask.getLogId(), (Object)holder.writeTask.getSessionMessageId());
            }
            return WriteState.Writing;
        }
        catch (IOException e) {
            LOGGER.info("write", (Throwable)e);
            this.handleSelectorException(k);
            return WriteState.Error;
        }
    }

    @Override
    protected ReadState read(SelectionKey k) {
        AbstractNioSession.AttachHolder holder = (AbstractNioSession.AttachHolder)k.attachment();
        SocketChannel chnl = (SocketChannel)k.channel();
        if (holder.rc == null) {
            holder.rc = new ReadContext();
        }
        try {
            this.reciever.recieve(chnl, holder.rc);
            if (holder.rc.isEOF()) {
                LOGGER.info("recieve data with eof!!!");
                this.handleFastFailed(true);
                holder.rc = null;
                holder.writeTask = null;
                return ReadState.ReadEOF;
            }
            if (holder.rc.isComplete()) {
                holder.rc.getBuf().flip();
                this.pubStrategy.finishPub(holder.rc.getBuf(), this.pipeRuntime.getSessionIdProvider());
                holder.rc = null;
                return ReadState.Finish;
            }
        }
        catch (IOException e) {
            LOGGER.info("read", (Throwable)e);
            this.handleFastFailed(true);
            holder.rc = null;
            holder.writeTask = null;
            return ReadState.Error;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("reading for logid {},session id is {} ", (Object)holder.writeTask.getLogId(), (Object)holder.writeTask.getSessionMessageId());
        }
        return ReadState.Reading;
    }

    @Override
    protected WriteTask startNewTask() {
        WriteTask task;
        long msgId = this.pipeRuntime.getSessionMessageId();
        if ((task = this.pubStrategy.getNextTask(this.idGen, ++msgId, this.pipeRuntime.getSessionIdProvider().getSessionId(false), this.pipeRuntime.getTopicName())) != null) {
            if (task.getCount() == 0) {
                this.pipeRuntime.setSessionMessageId(msgId);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("new task messageid {}", (Object)task.getSessionMessageId());
                }
            } else if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("repeat task messageid {}", (Object)task.getSessionMessageId());
            }
            return task;
        }
        return null;
    }

    @Override
    protected void ensureTcp() {
        this.buildConnect(false, this.socketConf);
    }

    @Override
    protected void configTask(WriteTask task, boolean isNeedTcp) {
        if (!isNeedTcp && this.pubStrategy.getCurrentTaskCount() == 1) {
            task.setReConnectAndNoSend(true);
        }
    }

    @Override
    protected void handleFastFailed(boolean needBlocking) {
        if (needBlocking) {
            this.sessionRuntime.hasError = true;
        }
        super.safeCloseTcpConnect();
        this.pubStrategy.fastFailed(this.pipeRuntime.getSessionIdProvider());
        if (needBlocking) {
            this.sessionRuntime.hasError = false;
        }
    }

    @Override
    protected void handleTimeout() {
        if (this.pubStrategy.handlePubTimeout(this.pipeRuntime.getSessionIdProvider())) {
            LOGGER.info("send time out, and close socket....");
            this.safeCloseTcpConnect();
            this.sessionRuntime.needOpenTcp = true;
        }
    }

    @Override
    protected SessionSocketStream openStream(InetAddress addr, SocketConf conf) throws IOException {
        this.tcpConnect = SocketChannel.open();
        LOGGER.warn("stripe address--[{}:{}]", (Object)addr.getAddress().getHostName(), (Object)addr.getAddress().getPort());
        this.tcpConnect.configureBlocking(true);
        this.tcpConnect.socket().connect(addr.getAddress(), conf.getConectTimeout());
        this.tcpConnect.socket().setSoTimeout(conf.getIoTimeout());
        return new AbstractSessionSocketStream(){

            @Override
            public void connectSession(PipeRuntime pipeRuntime, PipeletInfo pipeletInfo) throws IOException {
                super.connectSession(pipeRuntime, pipeletInfo);
                AsynchronousPublisherImpl.this.tcpConnect.configureBlocking(false);
                LOGGER.info("register socket to nio. " + pipeletInfo.getPipeletName());
                SelectionKey key = AsynchronousPublisherImpl.this.tcpConnect.register(AsynchronousPublisherImpl.this.selector, 1);
                key.attach(new AbstractNioSession.AttachHolder());
                AsynchronousPublisherImpl.this.sessionRuntime.waitTask = true;
            }

            @Override
            protected Socket buildSocketIfNotExist() {
                return AsynchronousPublisherImpl.this.tcpConnect.socket();
            }

            @Override
            protected TransportStrategy getTransStrategy() {
                return AsynchronousPublisherImpl.this.pubStrategy;
            }

            @Override
            protected Receiver getReceiver() {
                return AsynchronousPublisherImpl.this.reciever;
            }

            @Override
            protected int getRole() {
                return 4;
            }

            @Override
            protected void afterCreateSession() throws IOException {
            }
        };
    }
}

