/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.bigpipe.transport.pub;

import com.baidu.bigpipe.protocol.LogIdGen;
import com.baidu.bigpipe.protocol.SessionIdProvider;
import com.baidu.bigpipe.protocol.pb.BigpipePBProtocol;
import com.baidu.bigpipe.transport.NSHead;
import com.baidu.bigpipe.transport.pub.AbstractPublishStrategy;
import com.baidu.bigpipe.transport.pub.InternalFutrue;
import com.baidu.bigpipe.transport.pub.Message;
import com.baidu.bigpipe.transport.pub.PublishStrategy;
import com.baidu.bigpipe.transport.pub.context.WriteTask;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.zip.Adler32;

public class GroupPublishStrategy
extends AbstractPublishStrategy
implements PublishStrategy {
    private BlockingQueue<PubTask> taskQueue = new LinkedBlockingQueue<PubTask>();
    private Map<String, PubTask> runingMap = new LinkedHashMap<String, PubTask>();

    public GroupPublishStrategy() {
        super.setMaxConcurrent(2000);
    }

    @Override
    public void submitMessage(Message msg) {
        PubTask pt = new PubTask();
        pt.pubTime = System.currentTimeMillis();
        pt.list.add(msg);
        pt.futrue = msg.future;
        pt.logId = msg.getLogId();
        this.taskQueue.offer(pt);
    }

    @Override
    public void submitMessage(List<Message> msgList, InternalFutrue futrue) {
        PubTask pt = new PubTask();
        pt.pubTime = System.currentTimeMillis();
        pt.list = msgList;
        pt.futrue = futrue;
        this.taskQueue.offer(pt);
    }

    @Override
    public WriteTask getNextTask(LogIdGen idGen, long messageId, String sessionId, String topicName) {
        Message msg = null;
        String groupLogId = null;
        for (String logId : this.runingMap.keySet()) {
            PubTask pt = this.runingMap.get(logId);
            if (pt.index >= pt.list.size()) continue;
            groupLogId = pt.logId;
            msg = pt.list.get(pt.index);
            ++pt.index;
            pt.writeTime = System.currentTimeMillis();
            break;
        }
        if (msg == null) {
            if (!super.canRunTask()) {
                return null;
            }
            PubTask pt = (PubTask)this.taskQueue.poll();
            if (pt == null) {
                return null;
            }
            super.registerRunTask();
            if (pt.logId == null) {
                pt.logId = idGen.genId() + "";
            }
            groupLogId = pt.logId;
            this.runingMap.put(pt.logId, pt);
            msg = pt.list.get(pt.index);
            pt.writeTime = System.currentTimeMillis();
            ++pt.index;
        }
        WriteTask task = new WriteTask();
        task.setLogId(groupLogId + "." + msg.getLogId());
        task.setBuf(this.packMessage(msg, task.getLogId(), messageId, sessionId, topicName));
        task.setSessionMessageId(messageId);
        return task;
    }

    @Override
    public void handleShutDown(SessionIdProvider sessionIdProvider) {
        this.fastFailedRunning(sessionIdProvider);
        while (this.taskQueue.peek() != null) {
            PubTask pt = (PubTask)this.taskQueue.poll();
            pt.futrue.trigger(pt.list, "shut down", sessionIdProvider.getSessionId(false));
        }
    }

    @Override
    public void fastFailed(SessionIdProvider sessionIdProvider) {
        this.fastFailedRunning(sessionIdProvider);
    }

    @Override
    protected void fastFailedRunning(SessionIdProvider sessionIdProvider) {
        int needRelease = 0;
        for (String logId : this.runingMap.keySet()) {
            PubTask pt = this.runingMap.get(logId);
            List<Message> failedList = this.collectFailedTaskFromTask(pt);
            needRelease += pt.list.size();
            pt.futrue.trigger(failedList, "io error.", sessionIdProvider.getSessionId(false));
            super.unRegisterRunTask();
        }
        super.releaseToken(needRelease);
        this.runingMap.clear();
    }

    @Override
    protected void handleFinish(String logId, long status, SessionIdProvider sessionIdProvider) {
        String[] logIdArray = logId.split("\\.");
        String groupLogId = logIdArray[0];
        PubTask pt = this.runingMap.get(groupLogId);
        if (pt == null) {
            return;
        }
        pt.finList.add(logIdArray[1]);
        if (pt.finList.size() == pt.list.size()) {
            this.runingMap.remove(groupLogId);
            super.unRegisterRunTask();
            List<Message> failed = Collections.emptyList();
            pt.futrue.trigger(failed, null, sessionIdProvider.getSessionId(false));
        }
        super.releaseToken();
    }

    private List<Message> collectFailedTaskFromTask(PubTask pt) {
        LinkedList<Message> list = new LinkedList<Message>();
        for (Message m : pt.list) {
            if (pt.finList.indexOf(m.getLogId()) >= 0) continue;
            list.add(m);
        }
        return list;
    }

    private ByteBuffer packMessage(Message msg, String logId, long messageId, String sessionId, String topicName) {
        BigpipePBProtocol.BigpipeCommand.Builder cmd = BigpipePBProtocol.BigpipeCommand.newBuilder();
        cmd.setType(BigpipePBProtocol.BigpipeCommand.CommandType.BMQ_SEND);
        BigpipePBProtocol.MessageCommand.Builder msgCmd = cmd.getMessageBuilder();
        msgCmd.setDestination(topicName);
        msgCmd.setNoDup(false);
        msgCmd.setSessionMessageId(messageId);
        msgCmd.setReceiptId(logId);
        msgCmd.setSessionId(sessionId);
        if (!msg.packedMessage) {
            msgCmd.setMessageLength(8 + msg.getBody().length);
        } else {
            msgCmd.setMessageLength(msg.getBody().length);
        }
        byte[] cmdBuff = cmd.build().toByteArray();
        ByteBuffer bbf = ByteBuffer.allocate(40 + cmdBuff.length + (int)msgCmd.getMessageLength());
        bbf.order(ByteOrder.LITTLE_ENDIAN);
        bbf.position(36);
        bbf.putInt(cmdBuff.length);
        bbf.put(cmdBuff);
        if (!msg.packedMessage) {
            bbf.putInt((int)msgCmd.getMessageLength());
            bbf.putInt(msg.getBody().length);
            bbf.put(msg.getBody());
        } else {
            bbf.put(msg.getBody());
        }
        NSHead header = NSHead.factory(this.getProvider());
        header.setBodyLen(cmdBuff.length + 4 + (int)msgCmd.getMessageLength());
        Adler32 checksum = new Adler32();
        byte[] msgBuf = bbf.array();
        checksum.update(msgBuf, 36, (int)header.getBodyLen());
        long checkValue = checksum.getValue() & 0xFFFFFFFFFFFFFFFFL;
        header.setReserved((int)checkValue);
        System.arraycopy(header.toBytes(), 0, msgBuf, 0, 36);
        bbf.flip();
        return bbf;
    }

    @Override
    public int getCurrentTaskCount() {
        return this.runingMap.size();
    }

    @Override
    public boolean handlePubTimeout(SessionIdProvider sessionIdProvider) {
        boolean hasTimeout = false;
        long topTime = System.currentTimeMillis() - (long)this.socketConf.getIoTimeout();
        hasTimeout = hasTimeout || this.handleWaitingTimeout(topTime, sessionIdProvider);
        hasTimeout = hasTimeout || this.handleRunningingTimeout(topTime, sessionIdProvider);
        return false;
    }

    private boolean handleWaitingTimeout(long topTime, SessionIdProvider sessionIdProvider) {
        boolean hasTimeout = false;
        PubTask pt = null;
        while (null != (pt = (PubTask)this.taskQueue.peek()) && pt.pubTime <= topTime) {
            hasTimeout = true;
            this.taskQueue.poll();
            int needRelease = pt.list.size();
            pt.futrue.trigger(pt.list, "timeout, waiting to send", sessionIdProvider.getSessionId(false));
            super.releaseToken(needRelease);
        }
        return hasTimeout;
    }

    private boolean handleRunningingTimeout(long topTime, SessionIdProvider sessionIdProvider) {
        boolean hasTimeout = false;
        LinkedList<String> removeList = new LinkedList<String>();
        for (String logId : this.runingMap.keySet()) {
            PubTask pt = this.runingMap.get(logId);
            if (pt.writeTime > topTime) continue;
            hasTimeout = true;
            List<Message> failedList = this.collectFailedTaskFromTask(pt);
            int needRelease = pt.list.size();
            pt.futrue.trigger(failedList, "timeout,sending,logid is " + logId, sessionIdProvider.getSessionId(false));
            super.releaseToken(needRelease);
            removeList.add(logId);
        }
        for (String logId : removeList) {
            super.unRegisterRunTask();
            this.runingMap.remove(logId);
        }
        return hasTimeout;
    }

    private static class PubTask {
        List<Message> list = new LinkedList<Message>();
        InternalFutrue futrue;
        String logId;
        int index = 0;
        List<String> finList = new LinkedList<String>();
        long pubTime;
        long writeTime;

        private PubTask() {
        }
    }
}

