/*
 * Decompiled with CFR 0.152.
 */
package com.github.yingzhuo.logback.flume;

import ch.qos.logback.core.spi.ContextAware;
import com.github.yingzhuo.logback.flume.EventReporter;
import com.github.yingzhuo.logback.flume.RemoteFlumeAgent;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.Event;

public class FlumeAvroManager {
    private static final AtomicLong threadSequence = new AtomicLong(1L);
    private static final int MAX_RECONNECTS = 3;
    private static final int MINIMUM_TIMEOUT = 1000;
    private static final long MAXIMUM_REPORTING_MILLIS = 10000L;
    private static final long MINIMUM_REPORTING_MILLIS = 100L;
    private static final int DEFAULT_BATCH_SIZE = 50;
    private static final int DEFAULT_REPORTER_MAX_THREADPOOL_SIZE = 2;
    private static final int DEFAULT_REPORTER_MAX_QUEUE_SIZE = 50;
    private final ContextAware loggingContext;
    private final BlockingQueue<Event> evQueue;
    private final AsyncThread asyncThread;
    private final EventReporter reporter;

    private FlumeAvroManager(Properties properties, Long l, Integer n, Integer n2, Integer n3, ContextAware contextAware) {
        this.loggingContext = contextAware;
        int n4 = n2 == null ? 2 : n2;
        int n5 = n3 == null ? 50 : n3;
        this.reporter = new EventReporter(properties, this.loggingContext, n4, n5);
        this.evQueue = new ArrayBlockingQueue<Event>(1000);
        long l2 = this.hamonizeReportingWindow(l);
        int n6 = n == null ? 50 : n;
        this.asyncThread = new AsyncThread(this.evQueue, n6, l2);
        this.loggingContext.addInfo("Created a new flume agent with properties: " + properties.toString());
        this.asyncThread.start();
    }

    public static FlumeAvroManager create(List<RemoteFlumeAgent> list, Properties properties, Integer n, Long l, Integer n2, Integer n3, ContextAware contextAware) {
        if (list != null && list.size() > 0) {
            Properties properties2 = FlumeAvroManager.buildFlumeProperties(list);
            properties2.putAll((Map<?, ?>)properties);
            return new FlumeAvroManager(properties2, l, n, n2, n3, contextAware);
        }
        contextAware.addError("No valid agents configured");
        return null;
    }

    private static Properties buildFlumeProperties(List<RemoteFlumeAgent> list) {
        Properties properties = new Properties();
        int n = 0;
        for (RemoteFlumeAgent remoteFlumeAgent : list) {
            properties.put("hosts.h" + n++, remoteFlumeAgent.getHostname() + ':' + remoteFlumeAgent.getPort());
        }
        StringBuffer stringBuffer = new StringBuffer(n * 4);
        for (int i = 0; i < n; ++i) {
            stringBuffer.append("h").append(i).append(" ");
        }
        properties.put("hosts", stringBuffer.toString());
        properties.put("max-attempts", Integer.toString(3 * list.size()));
        properties.put("request-timeout", Integer.toString(1000));
        properties.put("connect-timeout", Integer.toString(1000));
        if (n > 1) {
            properties.put("client.type", "default_loadbalance");
            properties.put("host-selector", "round_robin");
        }
        properties.put("backoff", "true");
        properties.put("maxBackoff", "10000");
        return properties;
    }

    private long hamonizeReportingWindow(Long l) {
        if (l == null) {
            return 10000L;
        }
        if (l > 10000L) {
            return 10000L;
        }
        if (l < 100L) {
            return 100L;
        }
        return l;
    }

    public void stop() {
        this.asyncThread.shutdown();
    }

    public void send(Event event) {
        if (event != null) {
            this.loggingContext.addInfo("Queuing a new event: " + event.toString());
            this.evQueue.add(event);
        } else {
            this.loggingContext.addWarn("Trying to send a NULL event");
        }
    }

    private class AsyncThread
    extends Thread {
        private final BlockingQueue<Event> queue;
        private final long reportingWindow;
        private final int batchSize;
        private volatile boolean shutdown = false;

        private AsyncThread(BlockingQueue<Event> blockingQueue, int n, long l) {
            this.queue = blockingQueue;
            this.batchSize = n;
            this.reportingWindow = l;
            this.setDaemon(true);
            this.setName("FlumeAvroManager-" + threadSequence.getAndIncrement());
            FlumeAvroManager.this.loggingContext.addInfo("Started a new " + AsyncThread.class.getSimpleName() + " thread");
        }

        @Override
        public void run() {
            while (!this.shutdown) {
                Event event;
                long l = System.currentTimeMillis();
                long l2 = l + this.reportingWindow;
                Event event2 = new Event[this.batchSize];
                int n = 0;
                try {
                    while (n < this.batchSize && System.currentTimeMillis() < l2) {
                        l = Math.max(System.currentTimeMillis(), l);
                        event = this.queue.poll(l2 - l, TimeUnit.MILLISECONDS);
                        if (event == null) continue;
                        event2[n++] = event;
                    }
                }
                catch (InterruptedException interruptedException) {
                    FlumeAvroManager.this.loggingContext.addWarn(interruptedException.getLocalizedMessage(), (Throwable)interruptedException);
                }
                if (n <= 0) continue;
                if (n == ((Event[])event2).length) {
                    event = event2;
                } else {
                    event = new Event[n];
                    System.arraycopy(event2, 0, event, 0, n);
                }
                FlumeAvroManager.this.loggingContext.addInfo("Sending " + n + " event(s) to the EventReporter");
                try {
                    FlumeAvroManager.this.reporter.report((Event[])event);
                }
                catch (RejectedExecutionException rejectedExecutionException) {
                    FlumeAvroManager.this.loggingContext.addError("Logging events batch rejected by EventReporter. Check reporter connectivity or consider increasing reporterMaxThreadPoolSize or reporterMaxQueueSize", (Throwable)rejectedExecutionException);
                }
            }
            FlumeAvroManager.this.reporter.shutdown();
        }

        public void shutdown() {
            FlumeAvroManager.this.loggingContext.addInfo("Shutting down command received");
            this.shutdown = true;
        }
    }
}

