/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.spectator.atlas.impl;

import com.netflix.spectator.api.NoopRegistry;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.atlas.AtlasConfig;
import com.netflix.spectator.atlas.AtlasRegistry;
import com.netflix.spectator.atlas.Publisher;
import com.netflix.spectator.atlas.impl.EvalPayload;
import com.netflix.spectator.atlas.impl.GzipLevelOutputStream;
import com.netflix.spectator.atlas.impl.JsonUtils;
import com.netflix.spectator.atlas.impl.PublishPayload;
import com.netflix.spectator.atlas.impl.ValidationHelper;
import com.netflix.spectator.atlas.shaded.spectator-atlas.json.core.JsonFactory;
import com.netflix.spectator.atlas.shaded.spectator-atlas.json.databind.ObjectMapper;
import com.netflix.spectator.atlas.shaded.spectator-atlas.json.dataformat.smile.SmileFactory;
import com.netflix.spectator.impl.StreamHelper;
import com.netflix.spectator.ipc.http.HttpClient;
import com.netflix.spectator.ipc.http.HttpResponse;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DefaultPublisher
implements Publisher {
    private static final String CLOCK_SKEW_TIMER = "spectator.atlas.clockSkew";
    private static final Logger LOGGER = LoggerFactory.getLogger(AtlasRegistry.class);
    private final StreamHelper streamHelper = new StreamHelper();
    private final URI uri;
    private final URI evalUri;
    private final int connectTimeout;
    private final int readTimeout;
    private final int numThreads;
    private final Registry debugRegistry;
    private final HttpClient client;
    private final ObjectMapper jsonMapper;
    private final ObjectMapper smileMapper;
    private final ValidationHelper validationHelper;
    private ExecutorService senderPool;

    public DefaultPublisher(AtlasConfig config) {
        this(config, null);
    }

    public DefaultPublisher(AtlasConfig config, HttpClient client) {
        this(config, client, config.debugRegistry());
    }

    public DefaultPublisher(AtlasConfig config, HttpClient client, Registry registry) {
        this.uri = URI.create(config.uri());
        this.evalUri = URI.create(config.evalUri());
        this.connectTimeout = (int)config.connectTimeout().toMillis();
        this.readTimeout = (int)config.readTimeout().toMillis();
        this.numThreads = config.numThreads();
        this.debugRegistry = Optional.ofNullable(registry).orElse((Registry)new NoopRegistry());
        this.client = client != null ? client : HttpClient.create((Registry)this.debugRegistry);
        Function<String, String> replacementFunc = JsonUtils.createReplacementFunction(config.validTagCharacters());
        this.jsonMapper = JsonUtils.createMapper(new JsonFactory(), replacementFunc);
        this.smileMapper = JsonUtils.createMapper(new SmileFactory(), replacementFunc);
        this.validationHelper = new ValidationHelper(LOGGER, this.jsonMapper, this.debugRegistry);
    }

    @Override
    public void init() {
        ThreadFactory factory = new ThreadFactory(){
            private final AtomicInteger next = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) {
                String name = "spectator-atlas-publish-" + this.next.getAndIncrement();
                Thread t = new Thread(r, name);
                t.setDaemon(true);
                return t;
            }
        };
        this.senderPool = Executors.newFixedThreadPool(this.numThreads, factory);
    }

    private byte[] encodeBatch(PublishPayload payload) throws IOException {
        ByteArrayOutputStream baos = this.streamHelper.getOrCreateStream();
        try (GzipLevelOutputStream out = new GzipLevelOutputStream(baos);){
            this.smileMapper.writeValue(out, (Object)payload);
        }
        return baos.toByteArray();
    }

    private void recordClockSkew(long responseTimestamp) {
        if (responseTimestamp == 0L) {
            LOGGER.debug("no date timestamp on response, cannot record skew");
        } else {
            long delta = this.debugRegistry.clock().wallTime() - responseTimestamp;
            if (delta >= 0L) {
                this.debugRegistry.timer(CLOCK_SKEW_TIMER, new String[]{"id", "fast"}).record(delta, TimeUnit.MILLISECONDS);
            } else {
                this.debugRegistry.timer(CLOCK_SKEW_TIMER, new String[]{"id", "slow"}).record(-delta, TimeUnit.MILLISECONDS);
            }
            LOGGER.debug("clock skew between client and server: {}ms", (Object)delta);
        }
    }

    @Override
    public CompletableFuture<Void> publish(PublishPayload payload) {
        Runnable task = () -> {
            try {
                HttpResponse res;
                Instant date;
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("publish payload: {}", (Object)this.jsonMapper.writeValueAsString(payload));
                }
                this.recordClockSkew((date = (res = this.client.post(this.uri).withConnectTimeout(this.connectTimeout).withReadTimeout(this.readTimeout).addHeader("Content-Encoding", "gzip").withContent("application/x-jackson-smile", this.encodeBatch(payload)).send()).dateHeader("Date")) == null ? 0L : date.toEpochMilli());
                this.validationHelper.recordResults(payload.getMetrics().size(), res);
            }
            catch (Exception e) {
                LOGGER.warn("failed to send metrics (uri={})", (Object)this.uri, (Object)e);
                this.validationHelper.incrementDroppedHttp(payload.getMetrics().size());
            }
        };
        return CompletableFuture.runAsync(task, this.senderPool);
    }

    @Override
    public CompletableFuture<Void> publish(EvalPayload payload) {
        Runnable task = () -> {
            try {
                String json = this.jsonMapper.writeValueAsString(payload);
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("eval payload: {}", (Object)json);
                }
                this.client.post(this.evalUri).withConnectTimeout(this.connectTimeout).withReadTimeout(this.readTimeout).withJsonContent(json).send();
            }
            catch (Exception e) {
                LOGGER.warn("failed to send metrics for subscriptions (uri={})", (Object)this.evalUri, (Object)e);
            }
        };
        return CompletableFuture.runAsync(task, this.senderPool);
    }

    @Override
    public void close() throws IOException {
        if (this.senderPool != null) {
            this.senderPool.shutdown();
            this.senderPool = null;
        }
    }
}

