/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.emitter.ambari.metrics;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.druid.emitter.ambari.metrics.AmbariMetricsEmitterConfig;
import org.apache.druid.emitter.ambari.metrics.DruidToTimelineMetricConverter;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;

public class AmbariMetricsEmitter
extends AbstractTimelineMetricsSink
implements Emitter {
    private static final Logger log = new Logger(AmbariMetricsEmitter.class);
    private final DruidToTimelineMetricConverter timelineMetricConverter;
    private final List<Emitter> emitterList;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final LinkedBlockingQueue<TimelineMetric> eventsQueue;
    private final AmbariMetricsEmitterConfig config;
    private final String collectorURI;
    private static final long DEFAULT_FLUSH_TIMEOUT_MILLIS = 60000L;
    private static final Pattern DOT_OR_WHITESPACE_PATTERN = Pattern.compile("[\\s]+|[.]+");
    private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AmbariMetricsEmitter-%s").build());
    private final AtomicLong countLostEvents = new AtomicLong(0L);

    public AmbariMetricsEmitter(AmbariMetricsEmitterConfig config, List<Emitter> emitterList) {
        this.config = config;
        this.emitterList = emitterList;
        this.timelineMetricConverter = config.getDruidToTimelineEventConverter();
        this.eventsQueue = new LinkedBlockingQueue(config.getMaxQueueSize());
        this.collectorURI = StringUtils.format((String)"%s://%s:%s%s", (Object[])new Object[]{config.getProtocol(), config.getHostname(), config.getPort(), "/ws/v1/timeline/metrics"});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        AtomicBoolean atomicBoolean = this.started;
        synchronized (atomicBoolean) {
            log.info("Starting Ambari Metrics Emitter.", new Object[0]);
            if (!this.started.get()) {
                if ("https".equals(this.config.getProtocol())) {
                    this.loadTruststore(this.config.getTrustStorePath(), this.config.getTrustStoreType(), this.config.getTrustStorePassword());
                }
                this.exec.scheduleAtFixedRate(new ConsumerRunnable(), this.config.getFlushPeriod(), this.config.getFlushPeriod(), TimeUnit.MILLISECONDS);
                this.started.set(true);
            }
        }
    }

    public void emit(Event event) {
        if (!this.started.get()) {
            throw new ISE("Emit called unexpectedly before service start", new Object[0]);
        }
        if (event instanceof ServiceMetricEvent) {
            TimelineMetric timelineEvent = this.timelineMetricConverter.druidEventToTimelineMetric((ServiceMetricEvent)event);
            if (timelineEvent == null) {
                return;
            }
            try {
                boolean isSuccessful = this.eventsQueue.offer(timelineEvent, this.config.getEmitWaitTime(), TimeUnit.MILLISECONDS);
                if (!isSuccessful && this.countLostEvents.getAndIncrement() % 1000L == 0L) {
                    log.error("Lost total of [%s] events because of emitter queue is full. Please increase the capacity or/and the consumer frequency", new Object[]{this.countLostEvents.get()});
                }
            }
            catch (InterruptedException e) {
                log.error((Throwable)e, "got interrupted with message [%s]", new Object[]{e.getMessage()});
                Thread.currentThread().interrupt();
            }
        } else if (event instanceof AlertEvent) {
            for (Emitter emitter : this.emitterList) {
                emitter.emit(event);
            }
        } else {
            throw new ISE("unknown event type [%s]", new Object[]{event.getClass()});
        }
    }

    protected String getCollectorUri(String host) {
        return this.constructTimelineMetricUri(this.getCollectorProtocol(), host, this.getCollectorPort());
    }

    protected String getCollectorProtocol() {
        return this.config.getProtocol();
    }

    protected String getCollectorPort() {
        return String.valueOf(this.config.getPort());
    }

    protected int getTimeoutSeconds() {
        return 60;
    }

    protected String getZookeeperQuorum() {
        return null;
    }

    protected Collection<String> getConfiguredCollectorHosts() {
        return Collections.singleton(this.config.getHostname());
    }

    protected String getHostname() {
        return this.config.getHostname();
    }

    protected boolean isHostInMemoryAggregationEnabled() {
        return false;
    }

    protected int getHostInMemoryAggregationPort() {
        return 0;
    }

    protected String getHostInMemoryAggregationProtocol() {
        return "";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flush() {
        AtomicBoolean atomicBoolean = this.started;
        synchronized (atomicBoolean) {
            block6: {
                if (this.started.get()) {
                    ScheduledFuture<?> future = this.exec.schedule(new ConsumerRunnable(), 0L, TimeUnit.MILLISECONDS);
                    try {
                        future.get(60000L, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException | ExecutionException | TimeoutException e) {
                        if (!(e instanceof InterruptedException)) break block6;
                        throw new RuntimeException("interrupted flushing elements from queue", e);
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        AtomicBoolean atomicBoolean = this.started;
        synchronized (atomicBoolean) {
            this.flush();
            this.exec.shutdown();
            this.started.set(false);
        }
    }

    protected static String sanitize(String namespace) {
        return DOT_OR_WHITESPACE_PATTERN.matcher(namespace).replaceAll("_");
    }

    private class ConsumerRunnable
    implements Runnable {
        private ConsumerRunnable() {
        }

        @Override
        public void run() {
            try {
                int batchSize = AmbariMetricsEmitter.this.config.getBatchSize();
                TimelineMetrics metrics = new TimelineMetrics();
                while (AmbariMetricsEmitter.this.eventsQueue.size() > 0 && !AmbariMetricsEmitter.this.exec.isShutdown()) {
                    try {
                        TimelineMetric metricEvent = (TimelineMetric)AmbariMetricsEmitter.this.eventsQueue.poll(AmbariMetricsEmitter.this.config.getWaitForEventTime(), TimeUnit.MILLISECONDS);
                        if (metricEvent == null) continue;
                        metrics.addOrMergeTimelineMetric(metricEvent);
                        if (metrics.getMetrics().size() != batchSize) continue;
                        AmbariMetricsEmitter.this.emitMetrics(metrics);
                        log.debug("sent [%d] events", new Object[]{metrics.getMetrics().size()});
                        metrics = new TimelineMetrics();
                    }
                    catch (InterruptedException e) {
                        log.error((Throwable)e, e.getMessage(), new Object[0]);
                        Thread.currentThread().interrupt();
                    }
                }
                if (metrics.getMetrics().size() > 0) {
                    AmbariMetricsEmitter.this.emitMetrics(metrics);
                    log.debug("sent [%d] events", new Object[]{metrics.getMetrics().size()});
                }
            }
            catch (Exception e) {
                log.error((Throwable)e, e.getMessage(), new Object[0]);
            }
        }
    }
}

