/*
 * Decompiled with CFR 0.152.
 */
package io.logz.sender;

import io.logz.sender.FormattedLogMessage;
import io.logz.sender.SenderStatusReporter;
import io.logz.sender.com.bluejeans.common.bigqueue.BigQueue;
import io.logz.sender.com.google.gson.JsonObject;
import io.logz.sender.exceptions.LogzioParameterErrorException;
import io.logz.sender.exceptions.LogzioServerErrorException;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

public class LogzioSender {
    private static final int MAX_SIZE_IN_BYTES = 0x300000;
    public static final int INITIAL_WAIT_BEFORE_RETRY_MS = 2000;
    public static final int MAX_RETRIES_ATTEMPTS = 3;
    private static final Map<String, LogzioSender> logzioSenderInstances = new HashMap<String, LogzioSender>();
    private static final int FINAL_DRAIN_TIMEOUT_SEC = 20;
    private final BigQueue logsBuffer;
    private final File queueDirectory;
    private final URL logzioListenerUrl;
    private HttpURLConnection conn;
    private boolean dontCheckEnoughDiskSpace = false;
    private final String logzioToken;
    private final String logzioType;
    private final int drainTimeout;
    private final int fsPercentThreshold;
    private final String logzioUrl;
    private final String DEFAULT_URL = "https://listener.logz.io:8071";
    private final int socketTimeout;
    private final int connectTimeout;
    private final boolean debug;
    private final SenderStatusReporter reporter;
    private ScheduledExecutorService tasksExecutor;
    private final int gcPersistedQueueFilesIntervalSeconds;
    private final AtomicBoolean drainRunning = new AtomicBoolean(false);

    private LogzioSender(String logzioToken, String logzioType, int drainTimeout, int fsPercentThreshold, File bufferDir, String logzioUrl, int socketTimeout, int connectTimeout, boolean debug, SenderStatusReporter reporter, ScheduledExecutorService tasksExecutor, int gcPersistedQueueFilesIntervalSeconds) throws LogzioParameterErrorException {
        this.logzioToken = logzioToken;
        this.logzioType = logzioType;
        this.drainTimeout = drainTimeout;
        this.fsPercentThreshold = fsPercentThreshold;
        if (logzioUrl == null) {
            logzioUrl = "https://listener.logz.io:8071";
        }
        this.logzioUrl = logzioUrl;
        this.socketTimeout = socketTimeout;
        this.connectTimeout = connectTimeout;
        this.debug = debug;
        this.reporter = reporter;
        this.gcPersistedQueueFilesIntervalSeconds = gcPersistedQueueFilesIntervalSeconds;
        if (this.fsPercentThreshold == -1) {
            this.dontCheckEnoughDiskSpace = true;
        }
        if (bufferDir == null) {
            throw new LogzioParameterErrorException("bufferDir", "value is null.");
        }
        String dir = bufferDir.getAbsoluteFile().getParent();
        String queueNameDir = bufferDir.getName();
        if (dir == null || queueNameDir.isEmpty()) {
            throw new LogzioParameterErrorException("bufferDir", " value is empty: " + bufferDir.getAbsolutePath());
        }
        this.logsBuffer = new BigQueue(dir, queueNameDir);
        this.queueDirectory = bufferDir;
        try {
            this.logzioListenerUrl = new URL(this.logzioUrl + "/?token=" + this.logzioToken + "&type=" + this.logzioType);
        }
        catch (MalformedURLException e) {
            reporter.error("Can't connect to Logzio: " + e.getMessage(), e);
            throw new LogzioParameterErrorException("logzioUrl=" + logzioUrl + " token=" + logzioToken + " type=" + logzioType, "For some reason could not initialize URL. Cant recover..");
        }
        this.tasksExecutor = tasksExecutor;
        this.debug("Created new LogzioSender class");
    }

    public static synchronized LogzioSender getOrCreateSenderByType(String logzioToken, String logzioType, int drainTimeout, int fsPercentThreshold, File bufferDir, String logzioUrl, int socketTimeout, int connectTimeout, boolean debug, SenderStatusReporter reporter, ScheduledExecutorService tasksExecutor, int gcPersistedQueueFilesIntervalSeconds) throws LogzioParameterErrorException {
        LogzioSender logzioSenderInstance = logzioSenderInstances.get(logzioType);
        if (logzioSenderInstance == null) {
            if (bufferDir == null) {
                throw new LogzioParameterErrorException("bufferDir", "null");
            }
            LogzioSender logzioSender = new LogzioSender(logzioToken, logzioType, drainTimeout, fsPercentThreshold, bufferDir, logzioUrl, socketTimeout, connectTimeout, debug, reporter, tasksExecutor, gcPersistedQueueFilesIntervalSeconds);
            logzioSenderInstances.put(logzioType, logzioSender);
            return logzioSender;
        }
        reporter.info("Already found appender configured for type " + logzioType + ", re-using the same one.");
        if (logzioSenderInstance.tasksExecutor.isTerminated()) {
            reporter.info("The old task executor is terminated! replacing it with a new one");
            logzioSenderInstance.tasksExecutor = tasksExecutor;
        }
        return logzioSenderInstance;
    }

    public void start() {
        this.tasksExecutor.scheduleWithFixedDelay(this::drainQueueAndSend, 0L, this.drainTimeout, TimeUnit.SECONDS);
        this.tasksExecutor.scheduleWithFixedDelay(this::gcBigQueue, 0L, this.gcPersistedQueueFilesIntervalSeconds, TimeUnit.SECONDS);
    }

    public void stop() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        this.debug("Got stop request, Submitting a final drain queue task to drain before shutdown. Will timeout in 20 seconds.");
        try {
            executorService.submit(this::drainQueue).get(20L, TimeUnit.SECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            this.debug("Waited 20 seconds, but could not finish draining. quitting.", e);
        }
        finally {
            executorService.shutdownNow();
        }
    }

    public void gcBigQueue() {
        try {
            this.logsBuffer.gc();
        }
        catch (Throwable e) {
            this.reporter.error("Uncaught error from BigQueue.gc()", e);
        }
    }

    public void drainQueueAndSend() {
        try {
            if (this.drainRunning.get()) {
                this.debug("Drain is running so we won't run another one in parallel");
                return;
            }
            this.drainRunning.set(true);
            this.drainQueue();
        }
        catch (Exception e) {
            this.reporter.error("Uncaught error from Logz.io sender", e);
        }
        finally {
            this.drainRunning.set(false);
        }
    }

    public void send(JsonObject jsonMessage) {
        this.enqueue((jsonMessage + "\n").getBytes());
    }

    private void enqueue(byte[] message) {
        if (this.isEnoughDiskSpace()) {
            this.logsBuffer.enqueue(message);
        }
    }

    private boolean isEnoughDiskSpace() {
        if (this.dontCheckEnoughDiskSpace) {
            return true;
        }
        int actualUsedFsPercent = 100 - (int)((double)this.queueDirectory.getUsableSpace() / (double)this.queueDirectory.getTotalSpace() * 100.0);
        if (actualUsedFsPercent >= this.fsPercentThreshold) {
            this.reporter.warning(String.format("Logz.io: Dropping logs, as FS used space on %s is %d percent, and the drop threshold is %d percent", this.queueDirectory.getAbsolutePath(), actualUsedFsPercent, this.fsPercentThreshold));
            return false;
        }
        return true;
    }

    private List dequeueUpToMaxBatchSize() {
        ArrayList<FormattedLogMessage> logsList = new ArrayList<FormattedLogMessage>();
        while (!this.logsBuffer.isEmpty()) {
            byte[] message = this.logsBuffer.dequeue();
            if (message == null || message.length <= 0) continue;
            logsList.add(new FormattedLogMessage(message));
            if (this.sizeInBytes(logsList) < 0x300000) continue;
            break;
        }
        return logsList;
    }

    private void drainQueue() {
        this.debug("Attempting to drain queue");
        if (!this.logsBuffer.isEmpty()) {
            while (!this.logsBuffer.isEmpty()) {
                List logsList = this.dequeueUpToMaxBatchSize();
                try {
                    this.sendToLogzio(logsList);
                }
                catch (LogzioServerErrorException e) {
                    this.debug("Could not send log to logz.io: ", e);
                    this.debug("Will retry in the next interval");
                    logsList.forEach(logMessage -> this.enqueue(logMessage.getMessage()));
                    break;
                }
                if (!Thread.interrupted()) continue;
                this.debug("Stopping drainQueue to thread being interrupted");
                break;
            }
        }
    }

    private int sizeInBytes(List<FormattedLogMessage> logMessages) {
        int totalSize = 0;
        for (FormattedLogMessage currLog : logMessages) {
            totalSize += currLog.getSize();
        }
        return totalSize;
    }

    private byte[] toNewLineSeparatedByteArray(List<FormattedLogMessage> messages) {
        try {
            ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(this.sizeInBytes(messages));
            for (FormattedLogMessage currMessage : messages) {
                byteOutputStream.write(currMessage.getMessage());
            }
            return byteOutputStream.toByteArray();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private boolean shouldRetry(int statusCode) {
        boolean shouldRetry = true;
        switch (statusCode) {
            case 200: 
            case 400: 
            case 401: {
                shouldRetry = false;
            }
        }
        return shouldRetry;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendToLogzio(List<FormattedLogMessage> messages) throws LogzioServerErrorException {
        try {
            byte[] payload = this.toNewLineSeparatedByteArray(messages);
            int currentRetrySleep = 2000;
            for (int currTry = 1; currTry <= 3; ++currTry) {
                boolean shouldRetry = true;
                int responseCode = 0;
                String responseMessage = "";
                IOException savedException = null;
                try {
                    this.conn = (HttpURLConnection)this.logzioListenerUrl.openConnection();
                    this.conn.setRequestMethod("POST");
                    this.conn.setRequestProperty("Content-length", String.valueOf(payload.length));
                    this.conn.setRequestProperty("Content-Type", "text/plain");
                    this.conn.setReadTimeout(this.socketTimeout);
                    this.conn.setConnectTimeout(this.connectTimeout);
                    this.conn.setDoOutput(true);
                    this.conn.setDoInput(true);
                    this.conn.getOutputStream().write(payload);
                    responseCode = this.conn.getResponseCode();
                    responseMessage = this.conn.getResponseMessage();
                    if (responseCode == 400) {
                        BufferedReader bufferedReader = null;
                        try {
                            StringBuilder problemDescription = new StringBuilder();
                            InputStream errorStream = this.conn.getErrorStream();
                            if (errorStream != null) {
                                bufferedReader = new BufferedReader(new InputStreamReader(errorStream));
                                bufferedReader.lines().forEach(line -> problemDescription.append("\n").append((String)line));
                                this.reporter.warning(String.format("Got 400 from logzio, here is the output: %s", problemDescription));
                            }
                        }
                        finally {
                            if (bufferedReader != null) {
                                try {
                                    bufferedReader.close();
                                }
                                catch (Exception exception) {}
                            }
                        }
                    }
                    if (responseCode == 401) {
                        this.reporter.error("Logz.io: Got forbidden! Your token is not right. Unfortunately, dropping logs. Message: " + responseMessage);
                    }
                    shouldRetry = this.shouldRetry(responseCode);
                }
                catch (IOException e) {
                    savedException = e;
                    this.debug("Got IO exception - " + e.getMessage());
                }
                if (!shouldRetry) {
                    this.debug("Successfully sent bulk to logz.io, size: " + payload.length);
                    break;
                }
                if (currTry == 3) {
                    if (savedException != null) {
                        this.reporter.error("Got IO exception on the last bulk try to logz.io", savedException);
                    }
                    throw new LogzioServerErrorException("Got HTTP " + responseCode + " code from logz.io, with message: " + responseMessage);
                }
                this.debug("Could not send log to logz.io, retry (" + currTry + "/" + 3 + ")");
                this.debug("Sleeping for " + currentRetrySleep + " ms and will try again.");
                Thread.sleep(currentRetrySleep);
                currentRetrySleep *= 2;
            }
        }
        catch (InterruptedException e) {
            this.debug("Got interrupted exception");
            Thread.currentThread().interrupt();
        }
    }

    private void debug(String message) {
        if (this.debug) {
            this.reporter.info("DEBUG: " + message);
        }
    }

    private void debug(String message, Throwable e) {
        if (this.debug) {
            this.reporter.info("DEBUG: " + message, e);
        }
    }
}

