/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.bigpipe.transport;

import com.baidu.bigpipe.protocol.meta.NameService;
import com.baidu.bigpipe.protocol.meta.ZKMetaLoader;
import com.baidu.bigpipe.protocol.meta.concept.InetAddress;
import com.baidu.bigpipe.protocol.meta.concept.QueueAddress;
import com.baidu.bigpipe.protocol.meta.concept.TopicAddress;
import com.baidu.bigpipe.protocol.meta.exp.NameResolveException;
import com.baidu.bigpipe.protocol.meta.exp.QueueLocateException;
import com.baidu.bigpipe.transport.LifeController;
import com.baidu.bigpipe.transport.NsHeadReciever;
import com.baidu.bigpipe.transport.PipeRuntime;
import com.baidu.bigpipe.transport.PipeletInfo;
import com.baidu.bigpipe.transport.Receiver;
import com.baidu.bigpipe.transport.SessionSocketStream;
import com.baidu.bigpipe.transport.conf.BigPipeConf;
import com.baidu.bigpipe.transport.conf.SocketConf;
import java.io.IOException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BigpipeSessionSupport {
    private static final Logger LOGGER = LoggerFactory.getLogger(BigpipeSessionSupport.class);
    private final PipeletInfo pipeletInfo = new PipeletInfo();
    protected final PipeRuntime pipeRuntime = new PipeRuntime();
    protected final LifeController lifeController = new LifeController();
    protected Receiver reciever = new NsHeadReciever();
    private int type = PIPE_TYPE.TOPIC.ordinal();

    public int getType() {
        return this.type;
    }

    public void setType(int type) {
        this.type = type;
    }

    public void init(BigPipeConf conf) {
        this.pipeletInfo.setPipe(conf.getPipe());
        this.pipeletInfo.setCluster(conf.getCluster());
        this.pipeletInfo.setPipeletId(conf.getPipeletId());
        this.pipeletInfo.setUserName(conf.getUserName());
        this.pipeletInfo.setPwd(conf.getPwd());
        this.pipeletInfo.setPipeletName(conf.getPipeletName());
        this.pipeletInfo.setQueue(conf.getQueue());
        ZKMetaLoader metaLoader = new ZKMetaLoader();
        metaLoader.setConnectString(conf.getMetaString());
        NameService ns = new NameService(conf.getCluster());
        ns.setMetaLoader(metaLoader);
        this.pipeRuntime.ns = ns;
        this.pipeRuntime.reConnectMaxTimes = conf.getReConnectMaxTimes();
        this.pipeRuntime.sessionIdProvider = conf.getSessionProvider();
        this.continueConfig(conf);
        this.start(conf);
    }

    public abstract void start(BigPipeConf var1);

    protected abstract void continueConfig(BigPipeConf var1);

    protected abstract InetAddress lookupAddr(NameService var1, String var2) throws NameResolveException, KeeperException, QueueLocateException;

    protected abstract void safeCloseTcpConnect();

    protected abstract void waitingForConnect(int var1);

    protected abstract SessionSocketStream openStream(InetAddress var1, SocketConf var2) throws IOException;

    protected abstract void handleFastFailed(boolean var1);

    protected final void buildConnect(boolean init, SocketConf conf) {
        this.safeCloseTcpConnect();
        this.doConnect(init, conf);
    }

    private void doConnect(boolean init, SocketConf conf) {
        NameService ns = this.pipeRuntime.ns;
        int cnt = 0;
        int connectSessionFailedCount = 0;
        InetAddress addr = null;
        String pipeletOrQueueName = this.getPipeletOrQueueName();
        int zkCount = 0;
        int maxReConnectTimes = this.pipeRuntime.reConnectMaxTimes;
        if (init) {
            int n = maxReConnectTimes = this.pipeRuntime.reConnectMaxTimes > 5 ? 5 : this.pipeRuntime.reConnectMaxTimes;
            if (this.pipeRuntime.reConnectMaxTimes < 3600) {
                LOGGER.info("you should configure reConnectMaxTimes more than 3600");
            }
        }
        LOGGER.info("start to doConnect...");
        while (cnt < maxReConnectTimes) {
            if (this.lifeController.isShutDown()) {
                LOGGER.info("user set shut down when doConnect, bye.");
                return;
            }
            try {
                LOGGER.info("start to lookup for " + pipeletOrQueueName);
                addr = this.lookupAddr(ns, pipeletOrQueueName);
                LOGGER.info("after lookup for " + pipeletOrQueueName);
            }
            catch (NameResolveException e) {
                LOGGER.error("get address from zk error,may be invalid parameters.", (Throwable)e);
                this.waitingForConnect(cnt);
                LOGGER.info("get address from zk error, zkCount= {}", (Object)zkCount);
                ++zkCount;
                continue;
            }
            catch (KeeperException e) {
                LOGGER.error("get address from zk error,may be lost zookeeper server.", (Throwable)e);
                this.waitingForConnect(cnt);
                LOGGER.info("may be lost zookeeper server,zkCount= {}", (Object)zkCount);
                ++zkCount;
                continue;
            }
            catch (Exception e) {
                LOGGER.error("get address from zk error,unknown error.", (Throwable)e);
                this.waitingForConnect(cnt);
                LOGGER.info("unknown error,zkCount= {}", (Object)zkCount);
                ++zkCount;
                continue;
            }
            SessionSocketStream sessionStream = null;
            if (addr != null) {
                try {
                    LOGGER.info("open socket " + pipeletOrQueueName);
                    sessionStream = this.openStream(addr, conf);
                    if (this.getType() == PIPE_TYPE.QUEUE.ordinal()) {
                        QueueAddress queueAddress = (QueueAddress)addr;
                        this.pipeRuntime.queueName = queueAddress.getQueueName();
                        this.pipeRuntime.topicName = null;
                    } else {
                        TopicAddress topicAddress = (TopicAddress)addr;
                        this.pipeRuntime.topicName = topicAddress.getStripe().getName();
                        this.pipeRuntime.queueName = null;
                    }
                    LOGGER.info("doconnect ok, opened socket,try times:" + cnt);
                }
                catch (IOException e) {
                    LOGGER.error("doConnect error. try times:" + cnt, (Throwable)e);
                    this.safeCloseTcpConnect();
                    LOGGER.info("socket error,sleep 200ms");
                    try {
                        Thread.sleep(200L);
                    }
                    catch (InterruptedException e1) {
                        // empty catch block
                    }
                }
                if (sessionStream != null) {
                    try {
                        LOGGER.info("hand shake with bigpipe. " + pipeletOrQueueName);
                        boolean bl = init ? true : (this.pipeRuntime.refreshSessionId = connectSessionFailedCount > 0);
                        if (!init && this.pipeRuntime.refreshSessionId) {
                            LOGGER.info("It will refresh sesion id");
                        }
                        sessionStream.connectSession(this.pipeRuntime, this.pipeletInfo);
                        return;
                    }
                    catch (Exception e) {
                        ++connectSessionFailedCount;
                        LOGGER.error("create session error, cnt= " + cnt, (Throwable)e);
                        this.safeCloseTcpConnect();
                        LOGGER.info("create session,sleep 200ms");
                        try {
                            Thread.sleep(200L);
                        }
                        catch (InterruptedException e1) {
                            // empty catch block
                        }
                    }
                }
            }
            ++cnt;
        }
        LOGGER.info("try to create bigpipe session failed for {} times,fast failed all request.", (Object)cnt);
        if (init) {
            throw new RuntimeException("connect failed .");
        }
        this.handleFastFailed(true);
    }

    protected String getPipeletOrQueueName() {
        if (this.getType() == PIPE_TYPE.QUEUE.ordinal()) {
            return this.pipeletInfo.getQueue();
        }
        if (this.pipeletInfo.getPipeletName() != null && this.pipeletInfo.getPipeletName().length() > 0) {
            return this.pipeletInfo.getPipeletName();
        }
        return this.pipeletInfo.getPipe() + "_" + this.pipeletInfo.getPipeletId();
    }

    public static enum PIPE_TYPE {
        TOPIC,
        QUEUE;

    }
}

