/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.instrumentation;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Context;
import org.apache.flume.FlumeException;
import org.apache.flume.api.HostInfo;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.util.JMXPollUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GangliaServer
implements MonitorService {
    private static final Logger logger = LoggerFactory.getLogger(GangliaServer.class);
    public static final int BUFFER_SIZE = 1500;
    protected byte[] buffer = new byte[1500];
    protected int offset;
    private final List<SocketAddress> addresses = new ArrayList<SocketAddress>();
    private DatagramSocket socket = null;
    private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
    private List<HostInfo> hosts;
    protected final GangliaCollector collectorRunnable = new GangliaCollector();
    private int pollFrequency = 60;
    public static final String DEFAULT_UNITS = "";
    public static final int DEFAULT_TMAX = 60;
    public static final int DEFAULT_DMAX = 0;
    public static final int DEFAULT_SLOPE = 3;
    public static final String GANGLIA_DOUBLE_TYPE = "double";
    private volatile boolean isGanglia3 = false;
    private String hostname;
    public final String CONF_POLL_FREQUENCY = "pollFrequency";
    public final int DEFAULT_POLL_FREQUENCY = 60;
    public final String CONF_HOSTS = "hosts";
    public final String CONF_ISGANGLIA3 = "isGanglia3";
    private static final String GANGLIA_CONTEXT = "flume.";

    protected void xdr_string(String s) {
        byte[] bytes = s.getBytes();
        int len = bytes.length;
        this.xdr_int(len);
        System.arraycopy(bytes, 0, this.buffer, this.offset, len);
        this.offset += len;
        this.pad();
    }

    private void pad() {
        int newOffset = (this.offset + 3) / 4 * 4;
        while (this.offset < newOffset) {
            this.buffer[this.offset++] = 0;
        }
    }

    protected void xdr_int(int i) {
        this.buffer[this.offset++] = (byte)(i >> 24 & 0xFF);
        this.buffer[this.offset++] = (byte)(i >> 16 & 0xFF);
        this.buffer[this.offset++] = (byte)(i >> 8 & 0xFF);
        this.buffer[this.offset++] = (byte)(i & 0xFF);
    }

    public synchronized void sendToGangliaNodes() {
        for (SocketAddress addr : this.addresses) {
            try {
                DatagramPacket packet = new DatagramPacket(this.buffer, this.offset, addr);
                this.socket.send(packet);
            }
            catch (Exception ex) {
                logger.warn("Could not send metrics to metrics server: " + addr.toString(), (Throwable)ex);
            }
        }
        this.offset = 0;
    }

    @Override
    public void start() {
        try {
            this.socket = new DatagramSocket();
            this.hostname = InetAddress.getLocalHost().getHostName();
        }
        catch (SocketException ex) {
            logger.error("Could not create socket for metrics collection.");
            throw new FlumeException("Could not create socket for metrics collection.", ex);
        }
        catch (Exception ex2) {
            logger.warn("Unknown error occured", (Throwable)ex2);
        }
        for (HostInfo host : this.hosts) {
            this.addresses.add(new InetSocketAddress(host.getHostName(), host.getPortNumber()));
        }
        this.collectorRunnable.server = this;
        if (this.service.isShutdown() || this.service.isTerminated()) {
            this.service = Executors.newSingleThreadScheduledExecutor();
        }
        this.service.scheduleWithFixedDelay(this.collectorRunnable, 0L, this.pollFrequency, TimeUnit.SECONDS);
    }

    @Override
    public void stop() {
        this.service.shutdown();
        while (!this.service.isTerminated()) {
            try {
                logger.warn("Waiting for ganglia service to stop");
                this.service.awaitTermination(500L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException ex) {
                logger.warn("Interrupted while waiting for ganglia monitor to shutdown", (Throwable)ex);
                this.service.shutdownNow();
            }
        }
        this.addresses.clear();
    }

    public void setPollFrequency(int pollFrequency) {
        this.pollFrequency = pollFrequency;
    }

    public int getPollFrequency() {
        return this.pollFrequency;
    }

    public void setIsGanglia3(boolean isGanglia3) {
        this.isGanglia3 = isGanglia3;
    }

    public boolean isGanglia3() {
        return this.isGanglia3;
    }

    protected void createGangliaMessage(String name, String value) {
        logger.debug("Sending ganglia3 formatted message." + name + ": " + value);
        name = this.hostname + "." + name;
        this.xdr_int(0);
        String type = "string";
        try {
            Float.parseFloat(value);
            type = "float";
        }
        catch (NumberFormatException ex) {
            // empty catch block
        }
        this.xdr_string(type);
        this.xdr_string(name);
        this.xdr_string(value);
        this.xdr_string(DEFAULT_UNITS);
        this.xdr_int(3);
        this.xdr_int(60);
        this.xdr_int(0);
    }

    protected void createGangliaMessage31(String name, String value) {
        logger.debug("Sending ganglia 3.1 formatted message: " + name + ": " + value);
        this.xdr_int(128);
        this.xdr_string(this.hostname);
        this.xdr_string(name);
        this.xdr_int(0);
        String type = "string";
        try {
            Float.parseFloat(value);
            type = "float";
        }
        catch (NumberFormatException ex) {
            // empty catch block
        }
        this.xdr_string(type);
        this.xdr_string(name);
        this.xdr_string(DEFAULT_UNITS);
        this.xdr_int(3);
        this.xdr_int(60);
        this.xdr_int(0);
        this.xdr_int(1);
        this.xdr_string("GROUP");
        this.xdr_string("flume");
        this.sendToGangliaNodes();
        this.xdr_int(133);
        this.xdr_string(this.hostname);
        this.xdr_string(name);
        this.xdr_int(0);
        this.xdr_string("%s");
        this.xdr_string(value);
    }

    @Override
    public void configure(Context context) {
        this.pollFrequency = context.getInteger(this.CONF_POLL_FREQUENCY, 60);
        String localHosts = context.getString(this.CONF_HOSTS);
        if (localHosts == null || localHosts.isEmpty()) {
            throw new ConfigurationException("Hosts list cannot be empty.");
        }
        this.hosts = this.getHostsFromString(localHosts);
        this.isGanglia3 = context.getBoolean(this.CONF_ISGANGLIA3, false);
    }

    private List<HostInfo> getHostsFromString(String hosts) throws FlumeException {
        ArrayList<HostInfo> hostInfoList = new ArrayList<HostInfo>();
        String[] hostsAndPorts = hosts.split(",");
        int i = 0;
        for (String host : hostsAndPorts) {
            String[] hostAndPort = host.split(":");
            if (hostAndPort.length < 2) {
                logger.warn("Invalid ganglia host: ", (Object)host);
                continue;
            }
            try {
                hostInfoList.add(new HostInfo("ganglia_host-" + String.valueOf(i), hostAndPort[0], Integer.parseInt(hostAndPort[1])));
            }
            catch (Exception e) {
                logger.warn("Invalid ganglia host: " + host, (Throwable)e);
            }
        }
        if (hostInfoList.isEmpty()) {
            throw new FlumeException("No valid ganglia hosts defined!");
        }
        return hostInfoList;
    }

    protected class GangliaCollector
    implements Runnable {
        private GangliaServer server;

        protected GangliaCollector() {
        }

        @Override
        public void run() {
            try {
                Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
                for (String component : metricsMap.keySet()) {
                    Map<String, String> attributeMap = metricsMap.get(component);
                    for (String attribute : attributeMap.keySet()) {
                        if (GangliaServer.this.isGanglia3) {
                            this.server.createGangliaMessage(GangliaServer.GANGLIA_CONTEXT + component + "." + attribute, attributeMap.get(attribute));
                        } else {
                            this.server.createGangliaMessage31(GangliaServer.GANGLIA_CONTEXT + component + "." + attribute, attributeMap.get(attribute));
                        }
                        this.server.sendToGangliaNodes();
                    }
                }
            }
            catch (Throwable t) {
                logger.error("Unexpected error", t);
            }
        }
    }
}

