/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eventbus.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.eventbus.impl.EventBatch;
import com.netflix.eventbus.impl.EventBusImpl;
import com.netflix.eventbus.impl.EventConsumerStats;
import com.netflix.eventbus.spi.EventFilter;
import com.netflix.eventbus.spi.Subscribe;
import com.netflix.eventbus.spi.SubscriberConfigProvider;
import com.netflix.eventbus.spi.SyncSubscribersGatekeeper;
import com.netflix.eventbus.utils.EventBusUtils;
import com.netflix.servo.monitor.Stopwatch;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class EventConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventConsumer.class);
    private static final DynamicIntProperty maxRetriesOnQueueFull = DynamicPropertyFactory.getInstance().getIntProperty("eventbus.consumer.queuefull.maxretries", 5);
    private Class<?> targetEventClass;
    private final Method delegateSubscriber;
    private final Object subscriberClassInstance;
    private final CopyOnWriteArraySet<EventFilter> filters;
    private final EventBusImpl.ConsumerQueueSupplier.ConsumerQueue eventQueue;
    private final EventPoller poller;
    private final Thread pollerThread;
    private final Subscribe.BatchingStrategy batchingStrategy;
    private final EventConsumerStats stats;
    private final SubscriberConfigProvider.SubscriberConfig subscriberConfig;

    EventConsumer(Method subscriber, Object subscriberClassInstance, @Nullable EventFilter filter, Class<?> targetEventType, EventBusImpl.ConsumerQueueSupplier queueSupplier) {
        Preconditions.checkArgument((subscriber.getDeclaringClass() == subscriberClassInstance.getClass() ? 1 : 0) != 0, (Object)"The subscriber method does not belong to the subscriber class.");
        this.delegateSubscriber = subscriber;
        this.subscriberClassInstance = subscriberClassInstance;
        this.targetEventClass = targetEventType;
        this.stats = new EventConsumerStats(subscriberClassInstance.getClass().getName() + "_" + this.delegateSubscriber.getName() + "_" + this.targetEventClass.getName(), EventBusImpl.STATS_COLLECTION_DURATION_MILLIS.get());
        this.subscriberConfig = EventBusUtils.getSubscriberConfig(subscriber, subscriberClassInstance);
        this.batchingStrategy = this.subscriberConfig.getBatchingStrategy();
        this.eventQueue = queueSupplier.get(this.delegateSubscriber, this.subscriberConfig, this.stats.QUEUE_SIZE_COUNTER);
        this.poller = new EventPoller();
        this.pollerThread = new Thread(this.poller);
        this.pollerThread.start();
        this.filters = null != filter ? new CopyOnWriteArraySet<EventFilter>(Arrays.asList(filter)) : new CopyOnWriteArraySet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void enqueue(Object event) {
        if (SyncSubscribersGatekeeper.isSyncSubscriber(this.subscriberConfig, event.getClass(), this.delegateSubscriber.getClass())) {
            LOGGER.debug(String.format("Sending a sync event to subscriber: %s. Set the property %s to false to disable sync consumption.", this.delegateSubscriber.toGenericString(), "eventbus.allow.sync.subscribers"));
            this.processEvent(event);
            return;
        }
        Stopwatch start = this.stats.enqueueStats.start();
        try {
            int retries = 0;
            int maxRetries = maxRetriesOnQueueFull.get();
            while (!this.eventQueue.offer(event) && retries++ < maxRetries) {
                this.stats.QUEUE_OFFER_RETRY_COUNTER.increment();
                this.eventQueue.nonBlockingTake();
                LOGGER.info(String.format("Subscriber: %s queue full, rejected one %s as a result of retries.", this.delegateSubscriber.toGenericString(), Subscribe.BatchingStrategy.None == this.batchingStrategy ? "event" : "batch"));
            }
            if (0 != retries) {
                LOGGER.info(String.format("Subscriber: %s %s one event after %s retries.", this.delegateSubscriber.toGenericString(), retries >= maxRetries ? "rejected" : "accepted", retries - 1));
                if (retries >= maxRetries) {
                    this.stats.EVENT_ENQUEUE_REJECTED_COUNTER.increment();
                }
            }
        }
        finally {
            start.stop();
        }
    }

    void addFilters(EventFilter ... filters) {
        this.filters.addAll(Arrays.asList(filters));
    }

    void removeFilters(EventFilter ... filters) {
        this.filters.removeAll(Arrays.asList(filters));
    }

    void clearFilters() {
        this.filters.clear();
    }

    void shutdown() {
        this.poller.stop();
        this.eventQueue.clear();
        this.filters.clear();
    }

    Method getDelegateSubscriber() {
        return this.delegateSubscriber;
    }

    Object getContainerInstance() {
        return this.subscriberClassInstance;
    }

    Class<?> getTargetEventClass() {
        return this.targetEventClass;
    }

    Set<EventFilter> getAttachedFilters() {
        return this.filters;
    }

    @VisibleForTesting
    EventConsumerStats getStats() {
        return this.stats;
    }

    @VisibleForTesting
    SubscriberConfigProvider.SubscriberConfig getSubscriberConfig() {
        return this.subscriberConfig;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processEvent(Object event) {
        Stopwatch start = this.stats.consumptionStats.start();
        if (this.applyFilters(event = this.wrapIfBatched(event))) {
            try {
                this.delegateSubscriber.invoke(this.subscriberClassInstance, event);
            }
            catch (Exception e) {
                LOGGER.error("Failed to dispatch event: " + event + " to subscriber class: " + this.subscriberClassInstance.getClass() + " and method: " + this.delegateSubscriber.toGenericString() + ". Ignoring the event.", (Throwable)e);
            }
            finally {
                start.stop();
            }
        }
    }

    private boolean applyFilters(Object event) {
        if (EventBusUtils.isAnEventBatch(event)) {
            return true;
        }
        return EventBusUtils.applyFilters(event, this.filters, this.stats.filterStats, "subscriber: " + this.delegateSubscriber.toGenericString(), LOGGER);
    }

    private Object wrapIfBatched(Object event) {
        if (EventBusUtils.isAnEventBatch(event)) {
            return new BatchDecorator((EventBatch)event);
        }
        return event;
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        EventConsumer consumer = (EventConsumer)o;
        if (this.delegateSubscriber != null ? !this.delegateSubscriber.equals(consumer.delegateSubscriber) : consumer.delegateSubscriber != null) {
            return false;
        }
        if (this.filters != null ? !this.filters.equals(consumer.filters) : consumer.filters != null) {
            return false;
        }
        return !(this.subscriberClassInstance != null ? !this.subscriberClassInstance.equals(consumer.subscriberClassInstance) : consumer.subscriberClassInstance != null);
    }

    public int hashCode() {
        int result = this.delegateSubscriber != null ? this.delegateSubscriber.hashCode() : 0;
        result = 31 * result + (this.subscriberClassInstance != null ? this.subscriberClassInstance.hashCode() : 0);
        result = 31 * result + (this.filters != null ? this.filters.hashCode() : 0);
        return result;
    }

    class BatchDecorator
    implements Iterable {
        private final EventBatch batch;

        BatchDecorator(EventBatch batch) {
            this.batch = batch;
        }

        public Iterator iterator() {
            return new BatchIterator(this.batch);
        }

        private class BatchIterator
        implements Iterator {
            private final PeekingIterator delegatePeekingIterator;

            BatchIterator(EventBatch batch) {
                this.delegatePeekingIterator = Iterators.peekingIterator(batch.iterator());
                this._ensureNextEventIsConsumable();
            }

            @Override
            public boolean hasNext() {
                return this.delegatePeekingIterator.hasNext();
            }

            public Object next() {
                Object toReturn = this.delegatePeekingIterator.next();
                this._ensureNextEventIsConsumable();
                return toReturn;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException("Event batch iterator does not support remove.");
            }

            private void _ensureNextEventIsConsumable() {
                Object nextEvent;
                if (this.delegatePeekingIterator.hasNext() && !EventBusUtils.applyFilters(nextEvent = this.delegatePeekingIterator.peek(), EventConsumer.this.filters, ((EventConsumer)EventConsumer.this).stats.filterStats, "subscriber: " + EventConsumer.this.delegateSubscriber.toGenericString(), LOGGER)) {
                    this.delegatePeekingIterator.next();
                    this.delegatePeekingIterator.remove();
                    this._ensureNextEventIsConsumable();
                }
            }
        }
    }

    private class EventPoller
    implements Runnable {
        private volatile boolean stop;

        private EventPoller() {
        }

        private void stop() {
            this.stop = true;
            EventConsumer.this.pollerThread.interrupt();
        }

        @Override
        public void run() {
            LOGGER.info("Event consumer: " + EventConsumer.this.delegateSubscriber.toGenericString() + " started.");
            while (!this.stop) {
                try {
                    Object event = EventConsumer.this.eventQueue.blockingTake();
                    if (null == event) continue;
                    EventConsumer.this.processEvent(event);
                }
                catch (InterruptedException e) {
                    LOGGER.info("Event consumer: " + EventConsumer.this.delegateSubscriber.toGenericString() + " interrupted. Can be the result of a stop call, if so, you will see a 'consumer stopped' log.");
                }
            }
            LOGGER.info("Event consumer: " + EventConsumer.this.delegateSubscriber.toGenericString() + " stopped.");
        }
    }
}

