/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.bigpipe.transport.pub;

import com.baidu.bigpipe.exp.PushException;
import com.baidu.bigpipe.protocol.LogIdGen;
import com.baidu.bigpipe.protocol.SequenceLogIdGen;
import com.baidu.bigpipe.transport.conf.BigPipeConf;
import com.baidu.bigpipe.transport.pub.AbstractNioSession;
import com.baidu.bigpipe.transport.pub.AsynchronousPublisher;
import com.baidu.bigpipe.transport.pub.Message;
import com.baidu.bigpipe.transport.pub.PublishStrategy;
import com.baidu.bigpipe.transport.pub.SendFutrue;
import com.baidu.bigpipe.transport.pub.SendFutrueImpl;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractNioPublisher
extends AbstractNioSession
implements AsynchronousPublisher {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNioPublisher.class);
    protected PublishStrategy pubStrategy;
    protected LogIdGen idGen = new SequenceLogIdGen();

    public PublishStrategy getPubStrategy() {
        return this.pubStrategy;
    }

    public void setPubStrategy(PublishStrategy pubStrategy) {
        this.pubStrategy = pubStrategy;
    }

    @Override
    public void applyMessageIdGen(LogIdGen idGen) {
        this.idGen = idGen;
    }

    @Override
    protected void continueConfig(BigPipeConf conf) {
        super.continueConfig(conf);
        this.pubStrategy.applySocketConf(conf);
    }

    @Override
    public SendFutrue publish(Message message) throws PushException {
        final Message m = message;
        SendFutrueImpl futrue = new SendFutrueImpl();
        m.future = futrue;
        this.pushMessage(new PushMessageHandler(){

            @Override
            public void push() {
                AbstractNioPublisher.this.pubStrategy.submitMessage(m);
            }

            @Override
            public int getMessageListCount() {
                return 1;
            }
        });
        return futrue;
    }

    @Override
    public SendFutrue publish(List<Message> message) throws PushException {
        final List<Message> m = message;
        final SendFutrueImpl futrue = new SendFutrueImpl();
        this.pushMessage(new PushMessageHandler(){

            @Override
            public void push() {
                AbstractNioPublisher.this.pubStrategy.submitMessage(m, futrue);
            }

            @Override
            public int getMessageListCount() {
                return m.size();
            }
        });
        return futrue;
    }

    @Override
    protected void handleShutDown() {
        this.pubStrategy.handleShutDown(this.pipeRuntime.getSessionIdProvider());
        this.lifeController.getShutDownWait().countDown();
    }

    @Override
    public void shutDown() {
        super.shutDown();
    }

    @Override
    protected void waitingForConnect(int cnt) {
        LOGGER.info("get address from zk failed, sleep 500 ms.");
        try {
            Thread.sleep(500L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private void checkErrOrShutdown() throws PushException {
        if (this.sessionRuntime.hasError || this.lifeController.isShutDown()) {
            throw new PushException(this.sessionRuntime.hasError ? "error." : "shutdown.");
        }
    }

    private void pushMessage(PushMessageHandler handler) throws PushException {
        this.checkErrOrShutdown();
        try {
            this.pubStrategy.accqireToken(handler.getMessageListCount());
            this.checkErrOrShutdown();
            handler.push();
        }
        catch (InterruptedException e) {
            throw new PushException();
        }
        catch (PushException e) {
            this.pubStrategy.releaseToken(handler.getMessageListCount());
            throw e;
        }
        if (this.wakenUp.compareAndSet(false, true)) {
            this.selector.wakeup();
        }
    }

    private static interface PushMessageHandler {
        public void push();

        public int getMessageListCount();
    }
}

