/*
 * Decompiled with CFR 0.152.
 */
package ddtrot.dd.trace.common.metrics;

import ddtrot.dd.communication.http.OkHttpUtils;
import ddtrot.dd.trace.common.metrics.EventListener;
import ddtrot.dd.trace.common.metrics.Sink;
import ddtrot.dd.trace.util.AgentTaskScheduler;
import ddtrot.okhttp3.HttpUrl;
import ddtrot.okhttp3.OkHttpClient;
import ddtrot.okhttp3.Request;
import ddtrot.okhttp3.RequestBody;
import ddtrot.okhttp3.Response;
import ddtrot.org.jctools.queues.SpscArrayQueue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class OkHttpSink
implements Sink,
EventListener {
    private static final Logger log = LoggerFactory.getLogger(OkHttpSink.class);
    private static final long ASYNC_THRESHOLD_LATENCY = TimeUnit.SECONDS.toNanos(1L);
    private final OkHttpClient client;
    private final HttpUrl metricsUrl;
    private final List<EventListener> listeners;
    private final SpscArrayQueue<Request> enqueuedRequests = new SpscArrayQueue(10);
    private final AtomicLong lastRequestTime = new AtomicLong();
    private final AtomicLong asyncRequestCounter = new AtomicLong();
    private final boolean bufferingEnabled;
    private final boolean compressionEnabled;
    private final Map<String, String> headers;
    private final AtomicBoolean asyncTaskStarted = new AtomicBoolean(false);
    private volatile AgentTaskScheduler.Scheduled<OkHttpSink> future;

    public OkHttpSink(OkHttpClient client, String agentUrl, String path, boolean bufferingEnabled, boolean compressionEnabled, Map<String, String> headers) {
        this.client = client;
        this.metricsUrl = HttpUrl.get(agentUrl).resolve(path);
        this.listeners = new CopyOnWriteArrayList<EventListener>();
        this.bufferingEnabled = bufferingEnabled;
        this.compressionEnabled = compressionEnabled;
        this.headers = new HashMap<String, String>(headers);
        if (compressionEnabled) {
            this.headers.put("Content-Encoding", "gzip");
        }
    }

    @Override
    public void accept(int messageCount, ByteBuffer buffer) {
        if (!this.bufferingEnabled || this.lastRequestTime.get() < ASYNC_THRESHOLD_LATENCY) {
            this.send(OkHttpUtils.prepareRequest(this.metricsUrl, this.headers).post(this.makeRequestBody(buffer)).build());
            AgentTaskScheduler.Scheduled<OkHttpSink> future = this.future;
            if (future != null && this.enqueuedRequests.isEmpty()) {
                future.cancel();
                this.asyncTaskStarted.set(false);
            }
        } else {
            if (this.asyncTaskStarted.compareAndSet(false, true)) {
                this.future = AgentTaskScheduler.INSTANCE.scheduleAtFixedRate(new Sender(this.enqueuedRequests), this, 1L, 1L, TimeUnit.SECONDS);
            }
            this.sendAsync(messageCount, buffer);
        }
    }

    private RequestBody makeRequestBody(ByteBuffer buffer) {
        if (this.compressionEnabled) {
            return OkHttpUtils.gzippedMsgpackRequestBodyOf(Collections.singletonList(buffer));
        }
        return OkHttpUtils.msgpackRequestBodyOf(Collections.singletonList(buffer));
    }

    private void sendAsync(int messageCount, ByteBuffer buffer) {
        this.asyncRequestCounter.getAndIncrement();
        if (!this.enqueuedRequests.offer(OkHttpUtils.prepareRequest(this.metricsUrl, this.headers).post(this.makeRequestBody(buffer.duplicate())).build())) {
            log.debug("dropping payload of {} and {}B because sending queue was full", (Object)messageCount, (Object)buffer.limit());
        }
    }

    public boolean isInDegradedMode() {
        return this.asyncTaskStarted.get();
    }

    public long asyncRequestCount() {
        return this.asyncRequestCounter.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void send(Request request) {
        long start = System.nanoTime();
        try (Response response = this.client.newCall(request).execute();){
            if (!response.isSuccessful()) {
                this.handleFailure(response);
            } else {
                this.onEvent(EventListener.EventType.OK, "");
            }
        }
        catch (IOException e) {
            this.onEvent(EventListener.EventType.ERROR, e.getMessage());
        }
        finally {
            this.lastRequestTime.set(System.nanoTime() - start);
        }
    }

    @Override
    public void onEvent(EventListener.EventType eventType, String message) {
        for (EventListener listener : this.listeners) {
            listener.onEvent(eventType, message);
        }
    }

    @Override
    public void register(EventListener listener) {
        this.listeners.add(listener);
    }

    private void handleFailure(Response response) throws IOException {
        int code = response.code();
        if (code == 404) {
            this.onEvent(EventListener.EventType.DOWNGRADED, "could not find endpoint");
        } else if (code >= 400 && code < 500) {
            this.onEvent(EventListener.EventType.BAD_PAYLOAD, response.body().string());
        } else if (code >= 500) {
            this.onEvent(EventListener.EventType.ERROR, response.body().string());
        }
    }

    private static final class Sender
    implements AgentTaskScheduler.Task<OkHttpSink> {
        private final SpscArrayQueue<Request> inbox;

        private Sender(SpscArrayQueue<Request> inbox) {
            this.inbox = inbox;
        }

        @Override
        public void run(OkHttpSink target) {
            Request request;
            while ((request = this.inbox.poll()) != null) {
                target.send(request);
            }
        }
    }
}

