/*
 * Decompiled with CFR 0.152.
 */
package com.github.kristofa.flume;

import com.github.kristofa.flume.MetricReporter;
import com.github.kristofa.flume.MetricReporterBuilder;
import com.twitter.zipkin.gen.Annotation;
import com.twitter.zipkin.gen.LogEntry;
import com.twitter.zipkin.gen.Span;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.codec.binary.Base64;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.sink.AbstractSink;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZipkinMetricsSink
extends AbstractSink
implements Configurable {
    private static final String SCRIBE_CATEGORY = "category";
    private static final Logger LOGGER = LoggerFactory.getLogger(ZipkinMetricsSink.class);
    private static final int DEFAULT_BATCH_SIZE = 25;
    private static final String BATCH_SIZE_PROP_NAME = "batchsize";
    private SinkCounter sinkCounter;
    private int batchSize = 25;
    private LifecycleState lifeCycleState;
    private final MetricReporterBuilder metricReporterBuilder = new MetricReporterBuilder();
    private MetricReporter metricReporter;

    public synchronized void start() {
        super.start();
        this.lifeCycleState = LifecycleState.START;
        this.sinkCounter.start();
        this.metricReporter = this.metricReporterBuilder.build();
    }

    public synchronized void stop() {
        LOGGER.info("Stopping ZipkinGraphiteSink.");
        this.metricReporter.close();
        this.lifeCycleState = LifecycleState.STOP;
        this.sinkCounter.stop();
        super.stop();
        LOGGER.info("ZipkinGraphiteSink stopped. Metrics:{}", (Object)this.sinkCounter);
    }

    public synchronized LifecycleState getLifecycleState() {
        return this.lifeCycleState;
    }

    public Sink.Status process() throws EventDeliveryException {
        Sink.Status status = Sink.Status.BACKOFF;
        Channel channel = this.getChannel();
        Transaction txn = channel.getTransaction();
        txn.begin();
        try {
            Event event = channel.take();
            if (event != null) {
                this.process(this.create(event));
                int count = 1;
                while (count < this.batchSize && (event = channel.take()) != null) {
                    ++count;
                    try {
                        this.process(this.create(event));
                    }
                    catch (TException e) {
                        LOGGER.warn("We were unable to build spans from received data...", (Throwable)e);
                    }
                }
                this.sinkCounter.incrementBatchCompleteCount();
                status = Sink.Status.READY;
            } else {
                this.sinkCounter.incrementBatchEmptyCount();
            }
            txn.commit();
        }
        catch (IOException e) {
            txn.rollback();
            LOGGER.warn("IOException", (Throwable)e);
        }
        catch (Throwable e) {
            txn.rollback();
            throw new EventDeliveryException(e);
        }
        finally {
            txn.close();
        }
        return status;
    }

    public void configure(Context context) {
        this.batchSize = context.getInteger(BATCH_SIZE_PROP_NAME, Integer.valueOf(25));
        this.metricReporterBuilder.configure(context);
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(this.getName());
        }
        LOGGER.info("batchsize: {}", (Object)this.batchSize);
    }

    private LogEntry create(Event event) {
        byte[] body = event.getBody();
        LogEntry logEntry = new LogEntry();
        logEntry.setCategory((String)event.getHeaders().get(SCRIBE_CATEGORY));
        logEntry.setMessage(new String(body));
        return logEntry;
    }

    private void process(LogEntry logEntry) throws TException, IOException {
        Base64 base64 = new Base64();
        byte[] decodedSpan = base64.decode(logEntry.getMessage());
        ByteArrayInputStream buf = new ByteArrayInputStream(decodedSpan);
        TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
        TProtocol proto = factory.getProtocol((TTransport)new TIOStreamTransport((InputStream)buf));
        Span span = new Span();
        span.read(proto);
        for (Annotation annotation : span.getAnnotations()) {
            int duration = annotation.getDuration();
            if (duration <= 0) continue;
            String value = annotation.getValue();
            int equalSignIndex = value.indexOf("=");
            String metricName = null;
            metricName = equalSignIndex > -1 ? annotation.getValue().substring(0, equalSignIndex) : annotation.getValue();
            this.metricReporter.update(metricName, duration /= 1000);
        }
    }
}

