/*
 * Decompiled with CFR 0.152.
 */
package datadog.trace.common.metrics;

import datadog.communication.http.OkHttpUtils;
import datadog.trace.common.metrics.EventListener;
import datadog.trace.common.metrics.Sink;
import datadog.trace.core.DDTraceCoreInfo;
import datadog.trace.util.AgentTaskScheduler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class OkHttpSink
implements Sink,
EventListener {
    private static final Logger log = LoggerFactory.getLogger(OkHttpSink.class);
    private static final Map<String, String> HEADERS = Collections.singletonMap("Datadog-Meta-Tracer-Version", DDTraceCoreInfo.VERSION);
    private final OkHttpClient client;
    private final HttpUrl metricsUrl;
    private final List<EventListener> listeners;
    private final SpscArrayQueue<Request> enqueuedRequests = new SpscArrayQueue(10);
    private final AtomicLong lastRequestTime = new AtomicLong();
    private final AtomicLong asyncRequestCounter = new AtomicLong();
    private final long asyncThresholdLatency;
    private final boolean bufferingEnabled;
    private final AtomicBoolean asyncTaskStarted = new AtomicBoolean(false);
    private volatile AgentTaskScheduler.Scheduled<OkHttpSink> future;

    public OkHttpSink(String agentUrl, long timeoutMillis, boolean bufferingEnabled) {
        this(OkHttpUtils.buildHttpClient((HttpUrl)HttpUrl.get((String)agentUrl), (long)timeoutMillis), agentUrl, "v0.6/stats", bufferingEnabled);
    }

    public OkHttpSink(OkHttpClient client, String agentUrl, String path, boolean bufferingEnabled) {
        this(client, agentUrl, path, TimeUnit.SECONDS.toNanos(1L), bufferingEnabled);
    }

    public OkHttpSink(OkHttpClient client, String agentUrl, String path, long asyncThresholdLatency, boolean bufferingEnabled) {
        this.client = client;
        this.metricsUrl = HttpUrl.get((String)agentUrl).resolve(path);
        this.listeners = new CopyOnWriteArrayList<EventListener>();
        this.asyncThresholdLatency = asyncThresholdLatency;
        this.bufferingEnabled = bufferingEnabled;
    }

    public void accept(int messageCount, ByteBuffer buffer) {
        if (!this.bufferingEnabled || this.lastRequestTime.get() < this.asyncThresholdLatency) {
            this.send(OkHttpUtils.prepareRequest((HttpUrl)this.metricsUrl, HEADERS).put(OkHttpUtils.msgpackRequestBodyOf(Collections.singletonList(buffer))).build());
            AgentTaskScheduler.Scheduled<OkHttpSink> future = this.future;
            if (future != null && this.enqueuedRequests.isEmpty()) {
                future.cancel();
                this.asyncTaskStarted.set(false);
            }
        } else {
            if (this.asyncTaskStarted.compareAndSet(false, true)) {
                this.future = AgentTaskScheduler.INSTANCE.scheduleAtFixedRate(new Sender(this.enqueuedRequests), this, 1L, 1L, TimeUnit.SECONDS);
            }
            this.sendAsync(messageCount, buffer);
        }
    }

    private void sendAsync(int messageCount, ByteBuffer buffer) {
        this.asyncRequestCounter.getAndIncrement();
        if (!this.enqueuedRequests.offer((Object)OkHttpUtils.prepareRequest((HttpUrl)this.metricsUrl, HEADERS).put(OkHttpUtils.msgpackRequestBodyOf(Collections.singletonList(buffer.duplicate()))).build())) {
            log.debug("dropping payload of {} and {}B because sending queue was full", (Object)messageCount, (Object)buffer.limit());
        }
    }

    public boolean isInDegradedMode() {
        return this.asyncTaskStarted.get();
    }

    public long asyncRequestCount() {
        return this.asyncRequestCounter.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void send(Request request) {
        long start = System.nanoTime();
        try (Response response = this.client.newCall(request).execute();){
            if (!response.isSuccessful()) {
                this.handleFailure(response);
            } else {
                this.onEvent(EventListener.EventType.OK, "");
            }
        }
        catch (IOException e) {
            this.onEvent(EventListener.EventType.ERROR, e.getMessage());
        }
        finally {
            this.lastRequestTime.set(System.nanoTime() - start);
        }
    }

    @Override
    public void onEvent(EventListener.EventType eventType, String message) {
        for (EventListener listener : this.listeners) {
            listener.onEvent(eventType, message);
        }
    }

    @Override
    public void register(EventListener listener) {
        this.listeners.add(listener);
    }

    private void handleFailure(Response response) throws IOException {
        int code = response.code();
        if (code == 404) {
            this.onEvent(EventListener.EventType.DOWNGRADED, "could not find endpoint");
        } else if (code >= 400 && code < 500) {
            this.onEvent(EventListener.EventType.BAD_PAYLOAD, response.body().string());
        } else if (code >= 500) {
            this.onEvent(EventListener.EventType.ERROR, response.body().string());
        }
    }

    private static final class Sender
    implements AgentTaskScheduler.Task<OkHttpSink> {
        private final SpscArrayQueue<Request> inbox;

        private Sender(SpscArrayQueue<Request> inbox) {
            this.inbox = inbox;
        }

        @Override
        public void run(OkHttpSink target) {
            Request request;
            while ((request = (Request)this.inbox.poll()) != null) {
                target.send(request);
            }
        }
    }
}

