/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.gcp.pubsub.bind;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriberInterface;
import com.google.pubsub.v1.ProjectSubscriptionName;
import io.micronaut.context.BeanContext;
import io.micronaut.gcp.pubsub.bind.SubscriberFactory;
import io.micronaut.gcp.pubsub.bind.SubscriberFactoryConfig;
import io.micronaut.gcp.pubsub.configuration.SubscriberConfigurationProperties;
import io.micronaut.gcp.pubsub.exception.PubSubListenerException;
import io.micronaut.inject.qualifiers.Qualifiers;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class DefaultSubscriberFactory
implements SubscriberFactory,
AutoCloseable {
    private final ConcurrentHashMap<ProjectSubscriptionName, Subscriber> subscribers = new ConcurrentHashMap();
    private final TransportChannelProvider transportChannelProvider;
    private final CredentialsProvider credentialsProvider;
    private final BeanContext beanContext;
    private final Logger logger = LoggerFactory.getLogger(DefaultSubscriberFactory.class);

    public DefaultSubscriberFactory(@Named(value="pubsub") TransportChannelProvider transportChannelProvider, @Named(value="pubsub") CredentialsProvider credentialsProvider, BeanContext beanContext) {
        this.transportChannelProvider = transportChannelProvider;
        this.credentialsProvider = credentialsProvider;
        this.beanContext = beanContext;
    }

    public Subscriber createSubscriber(SubscriberFactoryConfig config) {
        Subscriber subscriber = this.subscribers.compute(config.getSubscriptionName(), (k, v) -> {
            if (v == null) {
                Subscriber.Builder builder = Subscriber.newBuilder((ProjectSubscriptionName)config.getSubscriptionName(), (MessageReceiver)config.getReceiver()).setChannelProvider(this.transportChannelProvider).setCredentialsProvider(this.credentialsProvider);
                Optional subscriberConfiguration = this.beanContext.findBean(SubscriberConfigurationProperties.class, Qualifiers.byName((String)config.getSubscriberConfiguration()));
                String executor = subscriberConfiguration.map(s -> s.getExecutor()).orElse(config.getDefaultExecutor());
                ExecutorService executorService = (ExecutorService)this.beanContext.getBean(ExecutorService.class, Qualifiers.byName((String)executor));
                if (!(executorService instanceof ScheduledExecutorService)) {
                    throw new IllegalStateException("Invalid Executor type provided, please make sure you have a ScheduledExecutorService configured for Subscriber: " + config.getSubscriptionName().getSubscription());
                }
                builder.setExecutorProvider((ExecutorProvider)FixedExecutorProvider.create((ScheduledExecutorService)((ScheduledExecutorService)executorService)));
                if (subscriberConfiguration.isPresent()) {
                    SubscriberConfigurationProperties properties = (SubscriberConfigurationProperties)subscriberConfiguration.get();
                    builder.setMaxAckExtensionPeriod(properties.getMaxAckExtensionPeriod());
                    builder.setParallelPullCount(properties.getParallelPullCount().intValue());
                    builder.setMaxDurationPerAckExtension(properties.getMaxDurationPerAckExtension());
                    builder.setFlowControlSettings(properties.getFlowControlSettings().build());
                }
                return builder.build();
            }
            throw new PubSubListenerException(String.format("Subscription %s is already registered for another method", config.getSubscriptionName().toString()));
        });
        subscriber.startAsync();
        return subscriber;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @PreDestroy
    public void close() throws Exception {
        while (!this.subscribers.entrySet().isEmpty()) {
            Iterator<Map.Entry<ProjectSubscriptionName, Subscriber>> it = this.subscribers.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<ProjectSubscriptionName, Subscriber> entry = it.next();
                SubscriberInterface subscriber = (SubscriberInterface)entry.getValue();
                try {
                    subscriber.stopAsync().awaitTerminated();
                }
                catch (Exception e) {
                    this.logger.error("Failed stopping subscriber for " + entry.getKey(), (Throwable)e);
                }
                finally {
                    it.remove();
                }
            }
        }
    }
}

