/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.system.hdfs;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.hdfs.HdfsConfig;
import org.apache.samza.system.hdfs.HdfsSystemAdmin;
import org.apache.samza.system.hdfs.reader.HdfsReaderFactory;
import org.apache.samza.system.hdfs.reader.MultiFileHdfsReader;
import org.apache.samza.util.BlockingEnvelopeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HdfsSystemConsumer
extends BlockingEnvelopeMap {
    private static final Logger LOG = LoggerFactory.getLogger(HdfsSystemConsumer.class);
    private static final String METRICS_GROUP_NAME = HdfsSystemConsumer.class.getName();
    private final HdfsReaderFactory.ReaderType readerType;
    private final String stagingDirectory;
    private final int bufferCapacity;
    private final int numMaxRetires;
    private ExecutorService executorService;
    private final LoadingCache<String, Map<Partition, List<String>>> cachedPartitionDescriptorMap;
    private final Map<SystemStreamPartition, MultiFileHdfsReader> readers;
    private final Map<SystemStreamPartition, Future> readerRunnableStatus;
    private volatile boolean isShutdown;
    private final HdfsSystemConsumerMetrics consumerMetrics;
    private final HdfsConfig hdfsConfig;

    public HdfsSystemConsumer(String systemName, Config config, HdfsSystemConsumerMetrics consumerMetrics) {
        super(consumerMetrics.getMetricsRegistry());
        this.hdfsConfig = new HdfsConfig(config);
        this.readerType = HdfsReaderFactory.getType(this.hdfsConfig.getFileReaderType(systemName));
        this.stagingDirectory = this.hdfsConfig.getStagingDirectory(systemName);
        this.bufferCapacity = this.hdfsConfig.getConsumerBufferCapacity(systemName);
        this.numMaxRetires = this.hdfsConfig.getConsumerNumMaxRetries(systemName);
        this.readers = new ConcurrentHashMap<SystemStreamPartition, MultiFileHdfsReader>();
        this.readerRunnableStatus = new ConcurrentHashMap<SystemStreamPartition, Future>();
        this.isShutdown = false;
        this.consumerMetrics = consumerMetrics;
        this.cachedPartitionDescriptorMap = CacheBuilder.newBuilder().build((CacheLoader)new CacheLoader<String, Map<Partition, List<String>>>(){

            public Map<Partition, List<String>> load(String streamName) {
                Validate.notEmpty((String)streamName);
                if (StringUtils.isBlank((String)HdfsSystemConsumer.this.stagingDirectory)) {
                    throw new SamzaException("Staging directory can't be empty. Is this not a yarn job (currently hdfs system consumer only works in the same yarn environment on which hdfs is running)? Is STAGING_DIRECTORY (" + HdfsConfig.STAGING_DIRECTORY() + ") not set (see HdfsConfig.scala)?");
                }
                return HdfsSystemAdmin.obtainPartitionDescriptorMap(HdfsSystemConsumer.this.stagingDirectory, streamName);
            }
        });
    }

    public void start() {
        LOG.info(String.format("HdfsSystemConsumer started with %d readers", this.readers.size()));
        this.executorService = Executors.newCachedThreadPool();
        this.readers.forEach((key, value) -> this.readerRunnableStatus.put((SystemStreamPartition)key, this.executorService.submit(new ReaderRunnable((MultiFileHdfsReader)value))));
    }

    public void stop() {
        LOG.info("Received request to stop HdfsSystemConsumer.");
        this.isShutdown = true;
        this.executorService.shutdown();
        LOG.info("HdfsSystemConsumer stopped.");
    }

    private List<String> getPartitionDescriptor(SystemStreamPartition systemStreamPartition) {
        String streamName = systemStreamPartition.getStream();
        Partition partition = systemStreamPartition.getPartition();
        try {
            return (List)((Map)this.cachedPartitionDescriptorMap.get((Object)streamName)).get(partition);
        }
        catch (ExecutionException e) {
            throw new SamzaException("Failed to obtain descriptor for " + systemStreamPartition, (Throwable)e);
        }
    }

    protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
        return new LinkedBlockingQueue<IncomingMessageEnvelope>(this.bufferCapacity);
    }

    public void register(SystemStreamPartition systemStreamPartition, String offset) {
        LOG.info("HdfsSystemConsumer register with partition: " + systemStreamPartition + " and offset " + offset);
        super.register(systemStreamPartition, offset);
        MultiFileHdfsReader reader = new MultiFileHdfsReader(this.readerType, systemStreamPartition, this.getPartitionDescriptor(systemStreamPartition), offset, this.numMaxRetires);
        this.readers.put(systemStreamPartition, reader);
        this.consumerMetrics.registerSystemStreamPartition(systemStreamPartition);
    }

    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
        systemStreamPartitions.forEach(systemStreamPartition -> {
            Future status = this.readerRunnableStatus.get(systemStreamPartition);
            if (status.isDone()) {
                try {
                    status.get();
                }
                catch (InterruptedException | ExecutionException e) {
                    MultiFileHdfsReader reader = this.readers.get(systemStreamPartition);
                    LOG.warn(String.format("Detect failure in ReaderRunnable for ssp: %s. Try to reconnect now.", systemStreamPartition), (Throwable)e);
                    reader.reconnect();
                    this.readerRunnableStatus.put((SystemStreamPartition)systemStreamPartition, this.executorService.submit(new ReaderRunnable(reader)));
                }
            }
        });
        return super.poll(systemStreamPartitions, timeout);
    }

    private void offerMessage(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) {
        try {
            super.put(systemStreamPartition, envelope);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new SamzaException("ReaderRunnable interrupted for ssp: " + systemStreamPartition);
        }
    }

    private void doPoll(MultiFileHdfsReader reader) {
        SystemStreamPartition systemStreamPartition = reader.getSystemStreamPartition();
        while (reader.hasNext() && !this.isShutdown) {
            IncomingMessageEnvelope messageEnvelope = reader.readNext();
            this.offerMessage(systemStreamPartition, messageEnvelope);
            this.consumerMetrics.incNumEvents(systemStreamPartition);
            this.consumerMetrics.incTotalNumEvents();
        }
        this.offerMessage(systemStreamPartition, IncomingMessageEnvelope.buildEndOfStreamEnvelope((SystemStreamPartition)systemStreamPartition));
        reader.close();
    }

    private class ReaderRunnable
    implements Runnable {
        public final MultiFileHdfsReader reader;

        public ReaderRunnable(MultiFileHdfsReader reader) {
            this.reader = reader;
        }

        @Override
        public void run() {
            HdfsSystemConsumer.this.doPoll(this.reader);
        }
    }

    public static class HdfsSystemConsumerMetrics {
        private final MetricsRegistry metricsRegistry;
        private final Map<SystemStreamPartition, Counter> numEventsCounterMap;
        private final Counter numTotalEventsCounter;

        public HdfsSystemConsumerMetrics(MetricsRegistry metricsRegistry) {
            this.metricsRegistry = metricsRegistry;
            this.numEventsCounterMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
            this.numTotalEventsCounter = metricsRegistry.newCounter(METRICS_GROUP_NAME, "num-total-events");
        }

        public void registerSystemStreamPartition(SystemStreamPartition systemStreamPartition) {
            this.numEventsCounterMap.putIfAbsent(systemStreamPartition, this.metricsRegistry.newCounter(METRICS_GROUP_NAME, "num-events-" + systemStreamPartition));
        }

        public void incNumEvents(SystemStreamPartition systemStreamPartition) {
            if (!this.numEventsCounterMap.containsKey(systemStreamPartition)) {
                this.registerSystemStreamPartition(systemStreamPartition);
            }
            this.numEventsCounterMap.get(systemStreamPartition).inc();
        }

        public void incTotalNumEvents() {
            this.numTotalEventsCounter.inc();
        }

        public MetricsRegistry getMetricsRegistry() {
            return this.metricsRegistry;
        }
    }
}

