/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.google.pubsub;

import com.google.api.core.AbstractApiService;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.common.base.Strings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.component.google.pubsub.GooglePubsubConstants;
import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint;
import org.apache.camel.component.google.pubsub.consumer.AcknowledgeSync;
import org.apache.camel.component.google.pubsub.consumer.CamelMessageReceiver;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class GooglePubsubConsumer
extends DefaultConsumer {
    private Logger localLog;
    private final GooglePubsubEndpoint endpoint;
    private final Processor processor;
    private ExecutorService executor;
    private List<Subscriber> subscribers;

    GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) throws Exception {
        super((Endpoint)endpoint, processor);
        this.endpoint = endpoint;
        this.processor = processor;
        this.subscribers = new LinkedList<Subscriber>();
        String loggerId = endpoint.getLoggerId();
        if (Strings.isNullOrEmpty((String)loggerId)) {
            loggerId = ((Object)((Object)this)).getClass().getName();
        }
        this.localLog = LoggerFactory.getLogger((String)loggerId);
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.localLog.info("Starting Google PubSub consumer for {}/{}", (Object)this.endpoint.getProjectId(), (Object)this.endpoint.getDestinationName());
        this.executor = this.endpoint.createExecutor();
        for (int i = 0; i < this.endpoint.getConcurrentConsumers(); ++i) {
            this.executor.submit(new SubscriberWrapper());
        }
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.localLog.info("Stopping Google PubSub consumer for {}/{}", (Object)this.endpoint.getProjectId(), (Object)this.endpoint.getDestinationName());
        if (this.subscribers != null && !this.subscribers.isEmpty()) {
            this.localLog.info("Stopping subscribers for {}/{}", (Object)this.endpoint.getProjectId(), (Object)this.endpoint.getDestinationName());
            this.subscribers.forEach(AbstractApiService::stopAsync);
        }
        if (this.executor != null) {
            if (this.getEndpoint() != null && this.getEndpoint().getCamelContext() != null) {
                this.getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor);
            } else {
                this.executor.shutdownNow();
            }
        }
        this.executor = null;
    }

    private class SubscriberWrapper
    implements Runnable {
        private final String subscriptionName;

        SubscriberWrapper() {
            this.subscriptionName = ProjectSubscriptionName.format((String)GooglePubsubConsumer.this.endpoint.getProjectId(), (String)GooglePubsubConsumer.this.endpoint.getDestinationName());
        }

        @Override
        public void run() {
            try {
                if (GooglePubsubConsumer.this.localLog.isDebugEnabled()) {
                    GooglePubsubConsumer.this.localLog.debug("Subscribing to {}", (Object)this.subscriptionName);
                }
                if (GooglePubsubConsumer.this.endpoint.isSynchronousPull()) {
                    this.synchronousPull(this.subscriptionName);
                } else {
                    this.asynchronousPull(this.subscriptionName);
                }
                GooglePubsubConsumer.this.localLog.debug("Exit run for subscription {}", (Object)this.subscriptionName);
            }
            catch (Exception e) {
                GooglePubsubConsumer.this.localLog.error("Failure getting messages from PubSub", (Throwable)e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void asynchronousPull(String subscriptionName) {
            while (GooglePubsubConsumer.this.isRunAllowed() && !GooglePubsubConsumer.this.isSuspendingOrSuspended()) {
                CamelMessageReceiver messageReceiver = new CamelMessageReceiver(GooglePubsubConsumer.this.endpoint, GooglePubsubConsumer.this.processor);
                Subscriber subscriber = GooglePubsubConsumer.this.endpoint.getComponent().getSubscriber(subscriptionName, messageReceiver);
                try {
                    GooglePubsubConsumer.this.subscribers.add(subscriber);
                    subscriber.startAsync().awaitRunning();
                    subscriber.awaitTerminated();
                }
                catch (Exception e) {
                    GooglePubsubConsumer.this.localLog.error("Failure getting messages from PubSub", (Throwable)e);
                }
                finally {
                    GooglePubsubConsumer.this.localLog.debug("Stopping async subscriber {}", (Object)subscriptionName);
                    subscriber.stopAsync();
                }
            }
        }

        private void synchronousPull(String subscriptionName) {
            while (GooglePubsubConsumer.this.isRunAllowed() && !GooglePubsubConsumer.this.isSuspendingOrSuspended()) {
                try {
                    SubscriberStub subscriber = GooglePubsubConsumer.this.endpoint.getComponent().getSubscriberStub();
                    Throwable throwable = null;
                    try {
                        PullRequest pullRequest = PullRequest.newBuilder().setMaxMessages(GooglePubsubConsumer.this.endpoint.getMaxMessagesPerPoll().intValue()).setReturnImmediately(false).setSubscription(subscriptionName).build();
                        PullResponse pullResponse = (PullResponse)subscriber.pullCallable().call((Object)pullRequest);
                        for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
                            PubsubMessage pubsubMessage = message.getMessage();
                            Exchange exchange = GooglePubsubConsumer.this.endpoint.createExchange();
                            exchange.getIn().setBody((Object)pubsubMessage.getData().toByteArray());
                            exchange.getIn().setHeader("CamelGooglePubsub.MsgAckId", (Object)message.getAckId());
                            exchange.getIn().setHeader("CamelGooglePubsub.MessageId", (Object)pubsubMessage.getMessageId());
                            exchange.getIn().setHeader("CamelGooglePubsub.PublishTime", (Object)pubsubMessage.getPublishTime());
                            if (null != pubsubMessage.getAttributesMap()) {
                                exchange.getIn().setHeader("CamelGooglePubsub.Attributes", (Object)pubsubMessage.getAttributesMap());
                            }
                            if (GooglePubsubConsumer.this.endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
                                ((ExtendedExchange)exchange.adapt(ExtendedExchange.class)).addOnCompletion((Synchronization)new AcknowledgeSync(subscriber, subscriptionName));
                            }
                            try {
                                GooglePubsubConsumer.this.processor.process(exchange);
                            }
                            catch (Exception e) {
                                exchange.setException((Throwable)e);
                            }
                        }
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (subscriber == null) continue;
                        if (throwable != null) {
                            try {
                                subscriber.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        subscriber.close();
                    }
                }
                catch (IOException e) {
                    GooglePubsubConsumer.this.localLog.error("Failure getting messages from PubSub", (Throwable)e);
                }
            }
        }
    }
}

