/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.util;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.samza.SamzaException;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.apache.samza.util.NoOpMetricsRegistry;

public abstract class BlockingEnvelopeMap
implements SystemConsumer {
    private final BlockingEnvelopeMapMetrics metrics;
    private final ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>> bufferedMessages;
    private final ConcurrentHashMap<SystemStreamPartition, AtomicLong> bufferedMessagesSize;
    private final Map<SystemStreamPartition, Boolean> noMoreMessage;
    private final Clock clock;
    private volatile Throwable failureCause = null;

    public BlockingEnvelopeMap() {
        this(new NoOpMetricsRegistry());
    }

    public BlockingEnvelopeMap(MetricsRegistry metricsRegistry) {
        this(metricsRegistry, new Clock(){

            @Override
            public long currentTimeMillis() {
                return System.currentTimeMillis();
            }
        });
    }

    public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
        this(metricsRegistry, clock, null);
    }

    public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) {
        metricsGroupName = metricsGroupName == null ? this.getClass().getName() : metricsGroupName;
        this.metrics = new BlockingEnvelopeMapMetrics(metricsGroupName, metricsRegistry);
        this.bufferedMessages = new ConcurrentHashMap();
        this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
        this.clock = clock;
        this.bufferedMessagesSize = new ConcurrentHashMap();
    }

    @Override
    public void register(SystemStreamPartition systemStreamPartition, String offset) {
        this.initializeInternalStateForSSP(systemStreamPartition);
    }

    private void initializeInternalStateForSSP(SystemStreamPartition systemStreamPartition) {
        this.metrics.initMetrics(systemStreamPartition);
        this.bufferedMessages.putIfAbsent(systemStreamPartition, this.newBlockingQueue());
        this.bufferedMessagesSize.putIfAbsent(systemStreamPartition, new AtomicLong(0L));
    }

    protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
        return new LinkedBlockingQueue<IncomingMessageEnvelope>();
    }

    @Override
    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
        long stopTime = this.clock.currentTimeMillis() + timeout;
        HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
        this.metrics.incPoll();
        for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) {
            BlockingQueue<IncomingMessageEnvelope> queue = this.bufferedMessages.get(systemStreamPartition);
            ArrayList<IncomingMessageEnvelope> outgoingList = new ArrayList<IncomingMessageEnvelope>(queue.size());
            if (queue.size() > 0) {
                queue.drainTo(outgoingList);
            } else if (timeout != 0L) {
                IncomingMessageEnvelope envelope = null;
                long timeRemaining = stopTime - this.clock.currentTimeMillis();
                if (timeout == -1L) {
                    while (envelope == null && !this.isAtHead(systemStreamPartition)) {
                        if (this.failureCause != null) {
                            String message = String.format("%s: Consumer has stopped.", this);
                            throw new SamzaException(message, this.failureCause);
                        }
                        this.metrics.incBlockingPoll(systemStreamPartition);
                        envelope = queue.poll(1000L, TimeUnit.MILLISECONDS);
                    }
                } else if (timeout > 0L && timeRemaining > 0L) {
                    this.metrics.incBlockingTimeoutPoll(systemStreamPartition);
                    envelope = queue.poll(timeRemaining, TimeUnit.MILLISECONDS);
                }
                if (envelope != null) {
                    outgoingList.add(envelope);
                    queue.drainTo(outgoingList);
                }
            }
            if (outgoingList.size() <= 0) continue;
            messagesToReturn.put(systemStreamPartition, outgoingList);
            this.subtractSizeOnQDrain(systemStreamPartition, outgoingList);
        }
        return messagesToReturn;
    }

    private void subtractSizeOnQDrain(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> outgoingList) {
        long outgoingListBytes = 0L;
        for (IncomingMessageEnvelope envelope : outgoingList) {
            outgoingListBytes += (long)envelope.getSize();
        }
        this.bufferedMessagesSize.get(systemStreamPartition).addAndGet(-1L * outgoingListBytes);
    }

    protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException {
        this.bufferedMessages.get(systemStreamPartition).put(envelope);
        this.bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize());
    }

    protected void putAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) throws InterruptedException {
        BlockingQueue<IncomingMessageEnvelope> queue = this.bufferedMessages.get(systemStreamPartition);
        for (IncomingMessageEnvelope envelope : envelopes) {
            queue.put(envelope);
        }
    }

    public int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) {
        BlockingQueue<IncomingMessageEnvelope> queue = this.bufferedMessages.get(systemStreamPartition);
        if (queue == null) {
            throw new NullPointerException("Attempting to get queue for " + systemStreamPartition + ", but the system/stream/partition was never registered.");
        }
        return queue.size();
    }

    public long getMessagesSizeInQueue(SystemStreamPartition systemStreamPartition) {
        AtomicLong sizeInBytes = this.bufferedMessagesSize.get(systemStreamPartition);
        if (sizeInBytes == null) {
            throw new NullPointerException("Attempting to get size for " + systemStreamPartition + ", but the system/stream/partition was never registered. or fetch");
        }
        return sizeInBytes.get();
    }

    protected Boolean setIsAtHead(SystemStreamPartition systemStreamPartition, boolean isAtHead) {
        this.metrics.setNoMoreMessages(systemStreamPartition, isAtHead);
        return this.noMoreMessage.put(systemStreamPartition, isAtHead);
    }

    protected boolean isAtHead(SystemStreamPartition systemStreamPartition) {
        Boolean isAtHead = this.noMoreMessage.get(systemStreamPartition);
        return this.getNumMessagesInQueue(systemStreamPartition) == 0 && isAtHead != null && isAtHead.equals(true);
    }

    protected void setFailureCause(Throwable throwable) {
        this.failureCause = throwable;
    }

    public class BufferSizeGauge
    extends Gauge<Long> {
        private final SystemStreamPartition systemStreamPartition;

        public BufferSizeGauge(SystemStreamPartition systemStreamPartition, String name) {
            super(name, 0L);
            this.systemStreamPartition = systemStreamPartition;
        }

        @Override
        public Long getValue() {
            AtomicLong sizeInBytes = (AtomicLong)BlockingEnvelopeMap.this.bufferedMessagesSize.get(this.systemStreamPartition);
            if (sizeInBytes == null) {
                return 0L;
            }
            return sizeInBytes.get();
        }
    }

    public class BufferGauge
    extends Gauge<Integer> {
        private final SystemStreamPartition systemStreamPartition;

        public BufferGauge(SystemStreamPartition systemStreamPartition, String name) {
            super(name, 0);
            this.systemStreamPartition = systemStreamPartition;
        }

        @Override
        public Integer getValue() {
            Queue envelopes = (Queue)BlockingEnvelopeMap.this.bufferedMessages.get(this.systemStreamPartition);
            if (envelopes == null) {
                return 0;
            }
            return envelopes.size();
        }
    }

    public class BlockingEnvelopeMapMetrics {
        private final String group;
        private final MetricsRegistry metricsRegistry;
        private final ConcurrentHashMap<SystemStreamPartition, Gauge<Integer>> noMoreMessageGaugeMap;
        private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollCountMap;
        private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollTimeoutCountMap;
        private final Counter pollCount;

        public BlockingEnvelopeMapMetrics(String group, MetricsRegistry metricsRegistry) {
            this.group = group;
            this.metricsRegistry = metricsRegistry;
            this.noMoreMessageGaugeMap = new ConcurrentHashMap();
            this.blockingPollCountMap = new ConcurrentHashMap();
            this.blockingPollTimeoutCountMap = new ConcurrentHashMap();
            this.pollCount = metricsRegistry.newCounter(group, "poll-count");
        }

        public void initMetrics(SystemStreamPartition systemStreamPartition) {
            this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, this.metricsRegistry.newGauge(this.group, "no-more-messages-" + systemStreamPartition, 0));
            this.blockingPollCountMap.putIfAbsent(systemStreamPartition, this.metricsRegistry.newCounter(this.group, "blocking-poll-count-" + systemStreamPartition));
            this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, this.metricsRegistry.newCounter(this.group, "blocking-poll-timeout-count-" + systemStreamPartition));
            this.metricsRegistry.newGauge(this.group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition));
            this.metricsRegistry.newGauge(this.group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition));
        }

        public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) {
            this.noMoreMessageGaugeMap.get(systemStreamPartition).set(noMoreMessages ? 1 : 0);
        }

        public void incBlockingPoll(SystemStreamPartition systemStreamPartition) {
            this.blockingPollCountMap.get(systemStreamPartition).inc();
        }

        public void incBlockingTimeoutPoll(SystemStreamPartition systemStreamPartition) {
            this.blockingPollTimeoutCountMap.get(systemStreamPartition).inc();
        }

        public void incPoll() {
            this.pollCount.inc();
        }
    }
}

