/*
 * Decompiled with CFR 0.152.
 */
package com.datadoghq.trace.writer;

import com.datadoghq.trace.DDBaseSpan;
import com.datadoghq.trace.Service;
import com.datadoghq.trace.writer.DDApi;
import com.datadoghq.trace.writer.Writer;
import com.datadoghq.trace.writer.WriterQueue;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DDAgentWriter
implements Writer {
    private static final Logger log = LoggerFactory.getLogger(DDAgentWriter.class);
    public static final String DEFAULT_HOSTNAME = "localhost";
    public static final int DEFAULT_PORT = 8126;
    static final int DEFAULT_MAX_TRACES = 1000;
    static final long API_TIMEOUT_SECONDS = 1L;
    static final long FLUSH_TIME_SECONDS = 1L;
    private final ThreadFactory agentWriterThreadFactory = new ThreadFactoryBuilder().setNameFormat("dd-agent-writer-%d").setDaemon(true).build();
    private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1, this.agentWriterThreadFactory);
    private final ExecutorService executor = Executors.newSingleThreadExecutor(this.agentWriterThreadFactory);
    private final DDApi api;
    private final WriterQueue<List<DDBaseSpan<?>>> traces;
    private boolean queueFullReported = false;

    public DDAgentWriter() {
        this(new DDApi(DEFAULT_HOSTNAME, 8126));
    }

    public DDAgentWriter(DDApi api) {
        this(api, new WriterQueue(1000));
    }

    public DDAgentWriter(DDApi api, WriterQueue<List<DDBaseSpan<?>>> queue) {
        this.api = api;
        this.traces = queue;
    }

    @Override
    public void write(List<DDBaseSpan<?>> trace) {
        List<DDBaseSpan<?>> removed = this.traces.add(trace);
        if (removed != null && !this.queueFullReported) {
            log.debug("Queue is full, traces will be discarded, queue size: {}", (Object)1000);
            this.queueFullReported = true;
            return;
        }
        this.queueFullReported = false;
    }

    @Override
    public void writeServices(final Map<String, Service> services) {
        Runnable task = new Runnable(){

            @Override
            public void run() {
                log.debug("Async writer about to write {} services", (Object)services.size());
                if (DDAgentWriter.this.api.sendServices(services)) {
                    log.debug("Async writer just sent  {} services", (Object)services.size());
                } else {
                    log.warn("Failed for Async writer to send {} services", (Object)services.size());
                }
            }
        };
        this.executor.submit(task);
    }

    @Override
    public void start() {
        this.scheduledExecutor.scheduleAtFixedRate(new TracesSendingTask(), 0L, 1L, TimeUnit.SECONDS);
    }

    @Override
    public void close() {
        this.scheduledExecutor.shutdownNow();
        this.executor.shutdownNow();
        try {
            this.scheduledExecutor.awaitTermination(500L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            log.info("Writer properly closed and async writer interrupted.");
        }
        try {
            this.executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            log.info("Writer properly closed and async writer interrupted.");
        }
    }

    public String toString() {
        return "DDAgentWriter { api=" + this.api + " }";
    }

    class TracesSendingTask
    implements Runnable {
        TracesSendingTask() {
        }

        @Override
        public void run() {
            Future<Long> future = DDAgentWriter.this.executor.submit(new SendingTask());
            try {
                long nbTraces = future.get(1L, TimeUnit.SECONDS);
                if (nbTraces > 0L) {
                    log.debug("Successfully sent {} traces to the API", (Object)nbTraces);
                }
            }
            catch (TimeoutException e) {
                log.debug("Timeout! Failed to send traces to the API: {}", (Object)e.getMessage());
            }
            catch (Throwable e) {
                log.debug("Failed to send traces to the API: {}", (Object)e.getMessage());
            }
        }

        class SendingTask
        implements Callable<Long> {
            SendingTask() {
            }

            @Override
            public Long call() throws Exception {
                boolean isSent;
                if (DDAgentWriter.this.traces.isEmpty()) {
                    return 0L;
                }
                List<List<DDBaseSpan<?>>> payload = DDAgentWriter.this.traces.getAll();
                if (log.isDebugEnabled()) {
                    int nbSpans = 0;
                    for (List list : payload) {
                        nbSpans += list.size();
                    }
                    log.debug("Sending {} traces ({} spans) to the API (async)", (Object)payload.size(), (Object)nbSpans);
                }
                if (!(isSent = DDAgentWriter.this.api.sendTraces(payload))) {
                    log.debug("Failing to send {} traces to the API", (Object)payload.size());
                    return 0L;
                }
                return payload.size();
            }
        }
    }
}

