/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.bigpipe.transport.pub;

import com.baidu.bigpipe.protocol.meta.NameService;
import com.baidu.bigpipe.protocol.meta.concept.TopicAddress;
import com.baidu.bigpipe.protocol.meta.exp.NameResolveException;
import com.baidu.bigpipe.transport.BigpipeSessionSupport;
import com.baidu.bigpipe.transport.Receiver;
import com.baidu.bigpipe.transport.conf.BigPipeConf;
import com.baidu.bigpipe.transport.conf.SocketConf;
import com.baidu.bigpipe.transport.pub.context.ReadContext;
import com.baidu.bigpipe.transport.pub.context.ReadState;
import com.baidu.bigpipe.transport.pub.context.WriteState;
import com.baidu.bigpipe.transport.pub.context.WriteTask;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractNioSession
extends BigpipeSessionSupport {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNioSession.class);
    protected boolean duplexMode = true;
    protected volatile Selector selector;
    private AtomicBoolean reBuilding;
    protected AtomicBoolean wakenUp;
    private boolean avoidEpollBug;
    protected volatile int selecttimeout;
    protected int publisherTryMaxCount;
    protected final SessionRuntime sessionRuntime;
    protected volatile SocketChannel tcpConnect;
    protected volatile SocketConf socketConf;

    public AbstractNioSession() {
        try {
            this.selector = Selector.open();
        }
        catch (IOException e) {
            LOGGER.error("init failed.", (Throwable)e);
            throw new RuntimeException(e);
        }
        this.reBuilding = new AtomicBoolean();
        this.wakenUp = new AtomicBoolean();
        this.avoidEpollBug = true;
        this.selecttimeout = 500;
        this.publisherTryMaxCount = 3;
        this.sessionRuntime = new SessionRuntime();
    }

    public SocketConf getSocketConf() {
        return this.socketConf;
    }

    public void setSocketConf(SocketConf socketConf) {
        this.socketConf = socketConf;
    }

    public Receiver getReciever() {
        return this.reciever;
    }

    public void setReciever(Receiver reciever) {
        this.reciever = reciever;
    }

    public boolean isDuplexMode() {
        return this.duplexMode;
    }

    public void setDuplexMode(boolean duplexMode) {
        this.duplexMode = duplexMode;
    }

    public int getPublisherTryMaxCount() {
        return this.publisherTryMaxCount;
    }

    public void setPublisherTryMaxCount(int publisherTryMaxCount) {
        this.publisherTryMaxCount = publisherTryMaxCount;
    }

    public void shutDown() {
        this.lifeController.setShutDown(true);
        try {
            this.lifeController.getShutDownWait().await(this.socketConf.getShutDownTimeout(), TimeUnit.MINUTES);
            try {
                this.selector.close();
            }
            catch (IOException e) {
                LOGGER.error("shutdown error, ignore.", (Throwable)e);
            }
            this.safeCloseTcpConnect();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Override
    protected void continueConfig(BigPipeConf conf) {
        this.selecttimeout = conf.getSelecttimeout();
    }

    @Override
    protected TopicAddress lookupAddr(NameService ns, String pipeletName) throws NameResolveException, KeeperException {
        return ns.lookupForPub(pipeletName);
    }

    protected abstract void handleSelectorException(SelectionKey var1);

    protected abstract void handleTimeout();

    protected abstract WriteState write(SelectionKey var1);

    protected abstract ReadState read(SelectionKey var1);

    protected abstract WriteTask startNewTask();

    protected abstract void configTask(WriteTask var1, boolean var2);

    protected abstract void ensureTcp();

    protected abstract void handleShutDown();

    @Override
    public final void start(BigPipeConf conf) {
        this.socketConf = conf;
        this.buildConnect(true, conf);
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                AbstractNioSession.this.runningFunction();
            }
        });
        String threadName = "bigpipe-client-thread-";
        if (conf.getThreadName() != null) {
            threadName = threadName + conf.getThreadName() + "-";
        }
        t.setName(threadName + t.getId());
        t.start();
        try {
            this.lifeController.getThreadRunning().await();
        }
        catch (InterruptedException e) {
            LOGGER.error("start thread error.", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected final void safeCloseTcpConnect() {
        if (this.tcpConnect == null) {
            return;
        }
        try {
            this.tcpConnect.configureBlocking(false);
            this.tcpConnect.close();
        }
        catch (IOException e) {
            LOGGER.info("safeCloseTcpConnect", (Throwable)e);
        }
        finally {
            this.tcpConnect = null;
        }
    }

    private void runningFunction() {
        int selectReturnsImmediately = 0;
        long minSelectTimeout = TimeUnit.MILLISECONDS.toNanos(this.selecttimeout) * 80L / 100L;
        boolean wakenupFromLoop = false;
        this.lifeController.getThreadRunning().countDown();
        while (true) {
            this.wakenUp.set(false);
            int selected = 0;
            long beforeSelect = System.nanoTime();
            try {
                selected = this.selector.select(this.selecttimeout);
            }
            catch (ClosedSelectorException ce) {
                throw new RuntimeException(ce);
            }
            catch (CancelledKeyException e) {
                LOGGER.info("maybe jdk epoll bug", (Throwable)e);
                selected = -1;
            }
            catch (IOException e) {
                LOGGER.info("selecor.select", (Throwable)e);
            }
            if (this.avoidEpollBug && selected == 0 && !wakenupFromLoop && !this.wakenUp.get()) {
                long timeBlocked = System.nanoTime() - beforeSelect;
                if (timeBlocked < minSelectTimeout) {
                    boolean notConnected = false;
                    for (SelectionKey key : this.selector.keys()) {
                        SelectableChannel ch = key.channel();
                        try {
                            if ((!(ch instanceof DatagramChannel) || ((DatagramChannel)ch).isConnected()) && (!(ch instanceof SocketChannel) || ((SocketChannel)ch).isConnected())) continue;
                            notConnected = true;
                            key.cancel();
                        }
                        catch (CancelledKeyException e) {}
                    }
                    selectReturnsImmediately = notConnected ? 0 : ++selectReturnsImmediately;
                } else {
                    selectReturnsImmediately = 0;
                }
                if (selectReturnsImmediately == 1024) {
                    this.rebuildSeletor();
                    selectReturnsImmediately = 0;
                    wakenupFromLoop = false;
                    continue;
                }
            } else {
                selectReturnsImmediately = 0;
            }
            if (this.wakenUp.get()) {
                wakenupFromLoop = true;
                this.selector.wakeup();
            } else {
                wakenupFromLoop = false;
            }
            if (selected > 0) {
                Set<SelectionKey> keys = this.selector.selectedKeys();
                Iterator<SelectionKey> i = keys.iterator();
                while (i.hasNext()) {
                    Enum state;
                    SelectionKey k = i.next();
                    i.remove();
                    if ((!k.isReadable() || (state = this.readCore(k)) != ReadState.ReadEOF && state != ReadState.Error) && (!k.isWritable() || (state = this.writeCore(k)) != WriteState.Error)) continue;
                    break;
                }
                if (keys.size() > 0) {
                    keys.clear();
                }
                if (this.lifeController.isShutDown()) break;
            }
            if (this.lifeController.isShutDown()) {
                this.handleShutDown();
                break;
            }
            this.handleTimeout();
            if (this.tcpConnect == null) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("tcp is null, there is not task, need to wait new task.");
                }
                this.sessionRuntime.waitTask = true;
            }
            if (this.lifeController.isShutDown()) {
                this.handleShutDown();
                break;
            }
            if (!this.sessionRuntime.waitTask) continue;
            this.tryToStartNewTask();
        }
    }

    protected void tryToStartNewTask() {
        boolean reConnect = this.sessionRuntime.needOpenTcp;
        WriteTask newTask = this.startNewTask();
        if (newTask != null) {
            if (reConnect) {
                this.ensureTcp();
                this.sessionRuntime.needOpenTcp = false;
            }
            this.configTask(newTask, reConnect);
            SelectionKey k = this.tcpConnect.keyFor(this.selector);
            AttachHolder holder = (AttachHolder)k.attachment();
            if (holder == null) {
                holder = new AttachHolder();
                k.attach(holder);
            }
            holder.writeTask = newTask;
            if (reConnect) {
                k.interestOps(k.interestOps() & 0xFFFFFFFE);
            }
            this.setupSelectionKeyInterest(k);
            this.sessionRuntime.waitTask = false;
        }
    }

    private void setupSelectionKeyInterest(SelectionKey k) {
        if (this.duplexMode) {
            k.interestOps(k.interestOps() | 4);
        } else {
            k.interestOps(4);
        }
    }

    private ReadState readCore(SelectionKey k) {
        ReadState state = this.read(k);
        if (state == ReadState.ReadEOF) {
            LOGGER.info("read eof,close tcp, waiting next connect...");
            this.safeCloseTcpConnect();
            this.sessionRuntime.needOpenTcp = true;
        }
        if (state == ReadState.Finish && !this.duplexMode) {
            this.sessionRuntime.waitTask = true;
        }
        if (state == ReadState.Error) {
            LOGGER.info("read error, and close tcp, waiting next connect...");
            this.safeCloseTcpConnect();
            this.sessionRuntime.needOpenTcp = true;
        }
        return state;
    }

    private WriteState writeCore(SelectionKey k) {
        WriteState state = this.write(k);
        if (state == WriteState.NoTask) {
            this.sessionRuntime.waitTask = true;
            k.interestOps(k.interestOps() & 0xFFFFFFFB);
        }
        if (state == WriteState.Finish) {
            if (!this.duplexMode) {
                k.interestOps(1);
            } else {
                this.sessionRuntime.waitTask = true;
                k.interestOps(k.interestOps() | 1);
            }
        }
        return state;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rebuildSeletor() {
        Selector newSelector;
        Selector oldSelector = this.selector;
        try {
            newSelector = Selector.open();
        }
        catch (Exception e) {
            LOGGER.info("rebuildSeletor", (Throwable)e);
            return;
        }
        this.reBuilding.set(true);
        while (true) {
            try {
                for (SelectionKey key : oldSelector.keys()) {
                    try {
                        if (key.channel().keyFor(newSelector) != null) continue;
                        int interestOps = key.interestOps();
                        key.cancel();
                        key.channel().register(newSelector, interestOps, key.attachment());
                    }
                    catch (Exception e) {
                        LOGGER.error("rebuildSeletor", (Throwable)e);
                        this.handleSelectorException(key);
                    }
                }
            }
            catch (ConcurrentModificationException e) {
                continue;
            }
            break;
        }
        AbstractNioSession e = this;
        synchronized (e) {
            this.selector = newSelector;
        }
        this.reBuilding.set(false);
        try {
            oldSelector.close();
        }
        catch (Throwable t) {
            LOGGER.error("oldSelector.close", t);
        }
    }

    protected static class AttachHolder {
        WriteTask writeTask;
        ReadContext rc;

        protected AttachHolder() {
        }
    }

    static class SessionRuntime {
        boolean waitTask = true;
        volatile boolean hasError = false;
        boolean needOpenTcp = false;

        SessionRuntime() {
        }
    }
}

