/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.jprotobuf.pbrpc.transport.handler;

import com.baidu.jprotobuf.pbrpc.data.RpcDataPackage;
import com.baidu.jprotobuf.pbrpc.data.RpcHeadMeta;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

public class RpcDataPackageDecoder
extends ByteToMessageDecoder {
    private static final int DEFAULT_CLEANUP_INTERVAL = 1000;
    private static Logger LOG = Logger.getLogger(RpcDataPackageDecoder.class.getName());
    private static final Map<Long, RpcDataPackage> tempTrunkPackages = new ConcurrentHashMap<Long, RpcDataPackage>();
    private static final AtomicBoolean startChunkPackageCleanUp = new AtomicBoolean(false);
    private ExecutorService es;
    private boolean stopChunkPackageTimeoutClean = false;

    public RpcDataPackageDecoder(final int chunkPackageTimeout) {
        if (chunkPackageTimeout <= 0) {
            return;
        }
        if (startChunkPackageCleanUp.compareAndSet(false, true)) {
            this.es = Executors.newSingleThreadExecutor();
            this.es.execute(new Runnable(){

                @Override
                public void run() {
                    while (!RpcDataPackageDecoder.this.stopChunkPackageTimeoutClean) {
                        if (!tempTrunkPackages.isEmpty()) {
                            HashMap currentCheckPackage = new HashMap(tempTrunkPackages);
                            for (Map.Entry entry : currentCheckPackage.entrySet()) {
                                if (((RpcDataPackage)entry.getValue()).getTimeStamp() + (long)chunkPackageTimeout <= System.currentTimeMillis()) continue;
                                tempTrunkPackages.remove(entry.getValue());
                                LOG.log(Level.SEVERE, "Found chunk package time out long than " + chunkPackageTimeout + "(ms) will clean up correlationId:" + ((RpcDataPackage)entry.getValue()).getRpcMeta().getCorrelationId());
                            }
                        }
                        try {
                            Thread.sleep(1000L);
                        }
                        catch (Exception e) {
                            LOG.log(Level.SEVERE, e.getMessage(), e);
                        }
                    }
                }
            });
        }
    }

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = this.decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }

    protected Object decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
        if (buf.readableBytes() < 12) {
            return null;
        }
        buf.markReaderIndex();
        long rpcMessageDecoderStart = System.nanoTime();
        ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), 12);
        byte[] bytes = new byte[12];
        buffer.get(bytes);
        RpcHeadMeta headMeta = new RpcHeadMeta();
        headMeta.read(bytes);
        int messageSize = headMeta.getMessageSize() + 12;
        if (buf.readableBytes() < messageSize) {
            buf.resetReaderIndex();
            return null;
        }
        String magicCode = headMeta.getMagicCodeAsString();
        if (!"PRPC".equals(magicCode)) {
            throw new Exception("Error magic code:" + magicCode);
        }
        byte[] totalBytes = new byte[messageSize];
        buf.readBytes(totalBytes, 0, messageSize);
        RpcDataPackage rpcDataPackage = new RpcDataPackage();
        rpcDataPackage.setTimeStamp(System.currentTimeMillis());
        rpcDataPackage.read(totalBytes);
        if (rpcDataPackage.isChunkPackage()) {
            Long chunkStreamId = rpcDataPackage.getChunkStreamId();
            RpcDataPackage chunkDataPackage = tempTrunkPackages.get(chunkStreamId);
            if (chunkDataPackage == null) {
                chunkDataPackage = rpcDataPackage;
                tempTrunkPackages.put(chunkStreamId, rpcDataPackage);
            } else {
                chunkDataPackage.mergeData(rpcDataPackage.getData());
            }
            if (rpcDataPackage.isFinalPackage()) {
                chunkDataPackage.chunkInfo(chunkStreamId, -1);
                tempTrunkPackages.remove(chunkStreamId);
                return chunkDataPackage;
            }
            return null;
        }
        long rpcMessageDecoderEnd = System.nanoTime();
        LOG.log(Level.FINE, "[profiling] nshead decode cost : " + (rpcMessageDecoderEnd - rpcMessageDecoderStart) / 1000L);
        return rpcDataPackage;
    }

    public void close() {
        this.stopChunkPackageTimeoutClean = true;
        if (this.es != null) {
            this.es.shutdown();
        }
    }
}

