/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.tracer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.tracer.thrift.Annotation;
import org.apache.accumulo.tracer.thrift.RemoteSpan;
import org.apache.htrace.HTraceConfiguration;
import org.apache.htrace.Span;
import org.apache.htrace.SpanReceiver;
import org.apache.htrace.TimelineAnnotation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AsyncSpanReceiver<SpanKey, Destination>
implements SpanReceiver {
    private static final Logger log = LoggerFactory.getLogger(AsyncSpanReceiver.class);
    public static final String SEND_TIMER_MILLIS = "tracer.send.timer.millis";
    public static final String QUEUE_SIZE = "tracer.queue.size";
    public static final String SPAN_MIN_MS = "tracer.span.min.ms";
    private final Map<SpanKey, Destination> clients = new HashMap<SpanKey, Destination>();
    protected String host = null;
    protected String service = null;
    Timer timer = new Timer("SpanSender", true);
    protected final AbstractQueue<RemoteSpan> sendQueue = new ConcurrentLinkedQueue<RemoteSpan>();
    protected final AtomicInteger sendQueueSize = new AtomicInteger(0);
    int maxQueueSize = 5000;
    long lastNotificationOfDroppedSpans = 0L;
    int minSpanSize = 1;

    protected abstract Destination createDestination(SpanKey var1) throws Exception;

    protected abstract void send(Destination var1, RemoteSpan var2) throws Exception;

    protected abstract SpanKey getSpanKey(Map<String, String> var1);

    AsyncSpanReceiver() {
    }

    public AsyncSpanReceiver(HTraceConfiguration conf) {
        this.host = conf.get("trace.host", this.host);
        if (this.host == null) {
            try {
                this.host = InetAddress.getLocalHost().getCanonicalHostName().toString();
            }
            catch (UnknownHostException e) {
                this.host = "unknown";
            }
        }
        this.service = conf.get("trace.service", this.service);
        this.maxQueueSize = conf.getInt(QUEUE_SIZE, this.maxQueueSize);
        this.minSpanSize = conf.getInt(SPAN_MIN_MS, this.minSpanSize);
        int millis = conf.getInt(SEND_TIMER_MILLIS, 1000);
        this.timer.schedule(new TimerTask(){

            @Override
            public void run() {
                try {
                    AsyncSpanReceiver.this.sendSpans();
                }
                catch (Exception ex) {
                    log.warn("Exception sending spans to destination", (Throwable)ex);
                }
            }
        }, millis, (long)millis);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendSpans() {
        while (!this.sendQueue.isEmpty()) {
            boolean sent = false;
            RemoteSpan s = (RemoteSpan)this.sendQueue.peek();
            SpanKey dest = this.getSpanKey(s.data);
            Destination client = this.clients.get(dest);
            if (client == null) {
                try {
                    this.clients.put(dest, this.createDestination(dest));
                }
                catch (Exception ex) {
                    log.warn("Exception creating connection to span receiver", (Throwable)ex);
                }
            }
            if (client != null) {
                try {
                    this.send(client, s);
                    AbstractQueue<RemoteSpan> ex = this.sendQueue;
                    synchronized (ex) {
                        this.sendQueue.remove();
                        this.sendQueue.notifyAll();
                        this.sendQueueSize.decrementAndGet();
                    }
                    sent = true;
                }
                catch (Exception ex) {
                    log.warn("Got error sending to " + dest + ", refreshing client", (Throwable)ex);
                    this.clients.remove(dest);
                }
            }
            if (sent) continue;
            break;
        }
    }

    public static Map<String, String> convertToStrings(Map<byte[], byte[]> bytesMap) {
        if (bytesMap == null) {
            return null;
        }
        HashMap<String, String> result = new HashMap<String, String>();
        for (Map.Entry<byte[], byte[]> bytes : bytesMap.entrySet()) {
            result.put(new String(bytes.getKey(), StandardCharsets.UTF_8), new String(bytes.getValue(), StandardCharsets.UTF_8));
        }
        return result;
    }

    public static List<Annotation> convertToAnnotations(List<TimelineAnnotation> annotations) {
        if (annotations == null) {
            return null;
        }
        ArrayList<Annotation> result = new ArrayList<Annotation>();
        for (TimelineAnnotation annotation : annotations) {
            result.add(new Annotation(annotation.getTime(), annotation.getMessage()));
        }
        return result;
    }

    public void receiveSpan(Span s) {
        if (s.getStopTimeMillis() - s.getStartTimeMillis() < (long)this.minSpanSize) {
            return;
        }
        Map<String, String> data = AsyncSpanReceiver.convertToStrings(s.getKVAnnotations());
        SpanKey dest = this.getSpanKey(data);
        if (dest != null) {
            List<Annotation> annotations = AsyncSpanReceiver.convertToAnnotations(s.getTimelineAnnotations());
            if (this.sendQueueSize.get() > this.maxQueueSize) {
                long now = System.currentTimeMillis();
                if (now - this.lastNotificationOfDroppedSpans > 60000L) {
                    log.warn("Tracing spans are being dropped because there are already " + this.maxQueueSize + " spans queued for delivery.\n" + "This does not affect performance, security or data integrity, but distributed tracing information is being lost.");
                    this.lastNotificationOfDroppedSpans = now;
                }
                return;
            }
            this.sendQueue.add(new RemoteSpan(this.host, this.service == null ? s.getProcessId() : this.service, s.getTraceId(), s.getSpanId(), s.getParentId(), s.getStartTimeMillis(), s.getStopTimeMillis(), s.getDescription(), data, annotations));
            this.sendQueueSize.incrementAndGet();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        AbstractQueue<RemoteSpan> abstractQueue = this.sendQueue;
        synchronized (abstractQueue) {
            while (!this.sendQueue.isEmpty()) {
                try {
                    this.sendQueue.wait();
                }
                catch (InterruptedException e) {
                    log.warn("flush interrupted");
                    break;
                }
            }
        }
    }
}

