/*
 * Decompiled with CFR 0.152.
 */
package com.dianping.cat.message.io;

import com.dianping.cat.ApplicationSettings;
import com.dianping.cat.analyzer.LocalAggregator;
import com.dianping.cat.configuration.ClientConfigManager;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.internal.MessageIdFactory;
import com.dianping.cat.message.io.ChannelManager;
import com.dianping.cat.message.io.DefaultMessageQueue;
import com.dianping.cat.message.io.MessageSender;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.codec.NativeMessageCodec;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.status.StatusExtension;
import com.dianping.cat.status.StatusExtensionRegister;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.helper.Threads;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.annotation.Named;

@Named
public class TcpSocketSender
implements Threads.Task,
MessageSender,
LogEnabled {
    public static final int SIZE = ApplicationSettings.getQueueSize();
    private static final int MAX_CHILD_NUMBER = 200;
    private static final int MAX_DURATION = 30000;
    public static final long HOUR = 3600000L;
    private MessageCodec m_codec = new NativeMessageCodec();
    @Inject
    private MessageStatistics m_statistics;
    @Inject
    private ClientConfigManager m_configManager;
    @Inject
    private MessageIdFactory m_factory;
    private MessageQueue m_queue = new DefaultMessageQueue(SIZE);
    private MessageQueue m_atomicQueue = new DefaultMessageQueue(SIZE);
    private ChannelManager m_channelManager;
    private Logger m_logger;
    private boolean m_active;
    private AtomicInteger m_errors = new AtomicInteger();

    public void enableLogging(Logger logger) {
        this.m_logger = logger;
    }

    public String getName() {
        return "TcpSocketSender";
    }

    @Override
    public void initialize(List<InetSocketAddress> addresses) {
        this.m_channelManager = new ChannelManager(this.m_logger, addresses, this.m_configManager, this.m_factory);
        Threads.forGroup((String)"cat").start((Runnable)((Object)this));
        Threads.forGroup((String)"cat").start((Runnable)((Object)this.m_channelManager));
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                TcpSocketSender.this.m_logger.info("shut down cat client in runtime shut down hook!");
                TcpSocketSender.this.shutdown();
            }
        });
        StatusExtensionRegister.getInstance().register(new StatusExtension(){

            @Override
            public String getDescription() {
                return "client-send-queue";
            }

            @Override
            public String getId() {
                return "client-send-queue";
            }

            @Override
            public Map<String, String> getProperties() {
                HashMap<String, String> map = new HashMap<String, String>();
                map.put("msg-queue", String.valueOf(TcpSocketSender.this.m_queue.size()));
                map.put("atomic-queue", String.valueOf(TcpSocketSender.this.m_queue.size()));
                return map;
            }
        });
    }

    private void logQueueFullInfo(MessageTree tree) {
        int count;
        if (this.m_statistics != null) {
            this.m_statistics.onOverflowed(tree);
        }
        if ((count = this.m_errors.incrementAndGet()) % 1000 == 0 || count == 1) {
            this.m_logger.error("Message queue is full in tcp socket sender! Count: " + count);
        }
        tree = null;
    }

    private MessageTree mergeTree(MessageQueue handler) {
        MessageTree tree;
        DefaultTransaction tran = new DefaultTransaction("System", "_CatMergeTree", null);
        MessageTree first = handler.poll();
        tran.setStatus("0");
        tran.setCompleted(true);
        tran.setDurationInMicros(0L);
        tran.addChild(first.getMessage());
        for (int max = 200; max >= 0 && (tree = handler.poll()) != null; --max) {
            tran.addChild(tree.getMessage());
        }
        ((DefaultMessageTree)first).setMessage(tran);
        return first;
    }

    private void offer(MessageTree tree) {
        if (this.m_configManager.isAtomicMessage(tree)) {
            boolean result = this.m_atomicQueue.offer(tree);
            if (!result) {
                this.logQueueFullInfo(tree);
            }
        } else {
            boolean result = this.m_queue.offer(tree);
            if (!result) {
                this.logQueueFullInfo(tree);
            }
        }
    }

    private void processAtomicMessage() {
        while (this.shouldMerge(this.m_atomicQueue)) {
            MessageTree tree = this.mergeTree(this.m_atomicQueue);
            boolean result = this.m_queue.offer(tree);
            if (result) continue;
            this.logQueueFullInfo(tree);
        }
    }

    private void processNormalMessage() {
        while (true) {
            ChannelFuture channel;
            if ((channel = this.m_channelManager.channel()) != null) {
                try {
                    MessageTree tree = this.m_queue.poll();
                    if (tree != null) {
                        this.sendInternal(channel, tree);
                        tree.setMessage(null);
                        continue;
                    }
                    try {
                        Thread.sleep(5L);
                    }
                    catch (Exception e) {
                        this.m_active = false;
                    }
                    break;
                }
                catch (Throwable t) {
                    this.m_logger.error("Error when sending message over TCP socket!", t);
                    continue;
                }
            }
            try {
                Thread.sleep(5L);
            }
            catch (Exception e) {
                this.m_active = false;
            }
        }
    }

    public void run() {
        MessageTree tree;
        this.m_active = true;
        while (this.m_active) {
            this.processAtomicMessage();
            this.processNormalMessage();
        }
        this.processAtomicMessage();
        while ((tree = this.m_queue.poll()) != null) {
            ChannelFuture channel = this.m_channelManager.channel();
            if (channel != null) {
                this.sendInternal(channel, tree);
                continue;
            }
            this.offer(tree);
        }
    }

    @Override
    public void send(MessageTree tree) {
        if (!this.m_configManager.isBlock()) {
            double sampleRatio = this.m_configManager.getSampleRatio();
            if (tree.canDiscard() && sampleRatio < 1.0 && !tree.isHitSample()) {
                this.processTreeInClient(tree);
            } else {
                this.offer(tree);
            }
        }
    }

    private void processTreeInClient(MessageTree tree) {
        LocalAggregator.aggregate(tree);
    }

    public void sendInternal(ChannelFuture channel, MessageTree tree) {
        if (tree.getMessageId() == null) {
            tree.setMessageId(this.m_factory.getNextId());
        }
        ByteBuf buf = this.m_codec.encode(tree);
        int size = buf.readableBytes();
        channel.channel().writeAndFlush((Object)buf);
        if (this.m_statistics != null) {
            this.m_statistics.onBytes(size);
        }
    }

    private boolean shouldMerge(MessageQueue queue) {
        MessageTree tree = queue.peek();
        if (tree != null) {
            long firstTime = tree.getMessage().getTimestamp();
            if (System.currentTimeMillis() - firstTime > 30000L || queue.size() >= 200) {
                return true;
            }
        }
        return false;
    }

    @Override
    public void shutdown() {
        this.m_active = false;
        this.m_channelManager.shutdown();
    }
}

