/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.pulsar;

import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.Suspendable;
import org.apache.camel.component.pulsar.PulsarEndpoint;
import org.apache.camel.component.pulsar.PulsarMessageListener;
import org.apache.camel.component.pulsar.utils.PulsarUtils;
import org.apache.camel.component.pulsar.utils.consumers.ConsumerCreationStrategy;
import org.apache.camel.component.pulsar.utils.consumers.ConsumerCreationStrategyFactory;
import org.apache.camel.support.DefaultConsumer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarConsumer
extends DefaultConsumer
implements Suspendable {
    private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumer.class);
    private final PulsarEndpoint pulsarEndpoint;
    private final ConsumerCreationStrategyFactory consumerCreationStrategyFactory;
    private Queue<Consumer<byte[]>> pulsarConsumers;
    private Queue<ExecutorService> executors;

    public PulsarConsumer(PulsarEndpoint pulsarEndpoint, Processor processor) {
        super((Endpoint)pulsarEndpoint, processor);
        this.pulsarEndpoint = pulsarEndpoint;
        this.pulsarConsumers = new ConcurrentLinkedQueue<Consumer<byte[]>>();
        this.consumerCreationStrategyFactory = ConsumerCreationStrategyFactory.create(this);
        this.executors = new ConcurrentLinkedQueue<ExecutorService>();
    }

    protected void doStart() throws Exception {
        this.pulsarConsumers = PulsarUtils.stopConsumers(this.pulsarConsumers);
        Collection<Consumer<byte[]>> consumers = this.createConsumers(this.pulsarEndpoint, this.consumerCreationStrategyFactory);
        if (!this.pulsarEndpoint.getPulsarConfiguration().isMessageListener()) {
            this.executors.addAll(this.subscribeWithThreadPool(consumers, this.pulsarEndpoint));
        }
        this.pulsarConsumers.addAll(consumers);
    }

    protected void doStop() throws PulsarClientException {
        this.executors = PulsarUtils.stopExecutors(this.pulsarEndpoint.getCamelContext().getExecutorServiceManager(), this.executors);
        this.pulsarConsumers = PulsarUtils.stopConsumers(this.pulsarConsumers);
    }

    protected void doSuspend() {
        PulsarUtils.pauseConsumers(this.pulsarConsumers);
    }

    protected void doResume() throws Exception {
        PulsarUtils.resumeConsumers(this.pulsarConsumers);
    }

    private Collection<Consumer<byte[]>> createConsumers(PulsarEndpoint endpoint, ConsumerCreationStrategyFactory factory) throws Exception {
        ConsumerCreationStrategy strategy = factory.getStrategy(endpoint.getPulsarConfiguration().getSubscriptionType());
        return strategy.create(endpoint);
    }

    private Collection<ExecutorService> subscribeWithThreadPool(Collection<Consumer<byte[]>> consumers, PulsarEndpoint endpoint) {
        int numThreads = endpoint.getPulsarConfiguration().getNumberOfConsumerThreads();
        return consumers.stream().map(consumer -> {
            ExecutorService executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool((Object)this, "pulsar-consumer", numThreads);
            for (int i = 0; i < numThreads; ++i) {
                executor.submit(new PulsarConsumerLoop(endpoint, (Consumer<byte[]>)consumer));
            }
            return executor;
        }).collect(Collectors.toList());
    }

    private class PulsarConsumerLoop
    implements Runnable {
        private final PulsarEndpoint endpoint;
        private final Consumer<byte[]> consumer;

        public PulsarConsumerLoop(PulsarEndpoint endpoint, Consumer<byte[]> consumer) {
            this.endpoint = endpoint;
            this.consumer = consumer;
        }

        @Override
        public void run() {
            PulsarMessageListener listener = new PulsarMessageListener(this.endpoint, PulsarConsumer.this);
            while (true) {
                try {
                    while (true) {
                        Message msg = this.consumer.receive();
                        listener.received(this.consumer, (Message<byte[]>)msg);
                    }
                }
                catch (PulsarClientException e) {
                    if (e.getCause() instanceof InterruptedException) {
                        LOGGER.info("Received shutdown signal, exiting");
                        break;
                    }
                    this.endpoint.getExceptionHandler().handleException((Throwable)e);
                    continue;
                }
                catch (Exception e) {
                    this.endpoint.getExceptionHandler().handleException((Throwable)e);
                    continue;
                }
                break;
            }
        }
    }
}

