/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.gcp.pubsub;

import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.TopicName;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.reactive.messaging.gcp.pubsub.PubSubConfig;
import io.smallrye.reactive.messaging.gcp.pubsub.PubSubMessageReceiver;
import io.smallrye.reactive.messaging.gcp.pubsub.i18n.PubSubExceptions;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.microprofile.reactive.messaging.Message;

@ApplicationScoped
public class PubSubManager {
    private final Map<PubSubConfig, Publisher> publishers = new ConcurrentHashMap<PubSubConfig, Publisher>();
    private final Map<PubSubConfig, TopicAdminClient> topicAdminClients = new ConcurrentHashMap<PubSubConfig, TopicAdminClient>();
    private final Map<PubSubConfig, SubscriptionAdminClient> subscriptionAdminClients = new ConcurrentHashMap<PubSubConfig, SubscriptionAdminClient>();
    private final List<MultiEmitter<? super Message<?>>> emitters = new CopyOnWriteArrayList();
    private final List<ManagedChannel> channels = new CopyOnWriteArrayList<ManagedChannel>();

    public Publisher publisher(PubSubConfig config) {
        return this.publishers.computeIfAbsent(config, this::buildPublisher);
    }

    public void subscriber(PubSubConfig config, MultiEmitter<? super Message<?>> emitter) {
        Subscriber subscriber = this.buildSubscriber(config, new PubSubMessageReceiver(emitter));
        emitter.onTermination(() -> {
            subscriber.stopAsync();
            try {
                subscriber.awaitTerminated(2L, TimeUnit.SECONDS);
            }
            catch (TimeoutException timeoutException) {
                // empty catch block
            }
        });
        subscriber.startAsync();
        this.emitters.add(emitter);
    }

    public SubscriptionAdminClient subscriptionAdminClient(PubSubConfig config) {
        return this.subscriptionAdminClients.computeIfAbsent(config, this::buildSubscriptionAdminClient);
    }

    public TopicAdminClient topicAdminClient(PubSubConfig config) {
        return this.topicAdminClients.computeIfAbsent(config, this::buildTopicAdminClient);
    }

    @PreDestroy
    public void destroy() {
        this.topicAdminClients.values().forEach(PubSubManager::shutdown);
        this.topicAdminClients.clear();
        this.subscriptionAdminClients.values().forEach(PubSubManager::shutdown);
        this.subscriptionAdminClients.clear();
        this.publishers.values().forEach(publisher -> {
            try {
                publisher.shutdown();
                publisher.awaitTermination(2L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        this.publishers.clear();
        this.emitters.forEach(MultiEmitter::complete);
        this.emitters.clear();
        this.channels.forEach(channel -> {
            try {
                channel.shutdown();
                channel.awaitTermination(2L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        this.channels.clear();
    }

    private SubscriptionAdminClient buildSubscriptionAdminClient(PubSubConfig config) {
        SubscriptionAdminSettings.Builder subscriptionAdminSettingsBuilder = SubscriptionAdminSettings.newBuilder();
        PubSubManager.buildCredentialsProvider(config).ifPresent(arg_0 -> ((SubscriptionAdminSettings.Builder)subscriptionAdminSettingsBuilder).setCredentialsProvider(arg_0));
        this.buildTransportChannelProvider(config).ifPresent(arg_0 -> ((SubscriptionAdminSettings.Builder)subscriptionAdminSettingsBuilder).setTransportChannelProvider(arg_0));
        try {
            return SubscriptionAdminClient.create((SubscriptionAdminSettings)subscriptionAdminSettingsBuilder.build());
        }
        catch (IOException e) {
            throw PubSubExceptions.ex.illegalStateUnableToBuildSubscriptionAdminClient(e);
        }
    }

    private TopicAdminClient buildTopicAdminClient(PubSubConfig config) {
        TopicAdminSettings.Builder topicAdminSettingsBuilder = TopicAdminSettings.newBuilder();
        PubSubManager.buildCredentialsProvider(config).ifPresent(arg_0 -> ((TopicAdminSettings.Builder)topicAdminSettingsBuilder).setCredentialsProvider(arg_0));
        this.buildTransportChannelProvider(config).ifPresent(arg_0 -> ((TopicAdminSettings.Builder)topicAdminSettingsBuilder).setTransportChannelProvider(arg_0));
        try {
            return TopicAdminClient.create((TopicAdminSettings)topicAdminSettingsBuilder.build());
        }
        catch (IOException e) {
            throw PubSubExceptions.ex.illegalStateUnableToBuildTopicAdminClient(e);
        }
    }

    private Publisher buildPublisher(PubSubConfig config) {
        ProjectTopicName topicName = ProjectTopicName.of((String)config.getProjectId(), (String)config.getTopic());
        try {
            Publisher.Builder publisherBuilder = Publisher.newBuilder((TopicName)topicName);
            PubSubManager.buildCredentialsProvider(config).ifPresent(arg_0 -> ((Publisher.Builder)publisherBuilder).setCredentialsProvider(arg_0));
            this.buildTransportChannelProvider(config).ifPresent(arg_0 -> ((Publisher.Builder)publisherBuilder).setChannelProvider(arg_0));
            return publisherBuilder.build();
        }
        catch (IOException e) {
            throw PubSubExceptions.ex.illegalStateUnableToBuildPublisher(e);
        }
    }

    private Subscriber buildSubscriber(PubSubConfig config, PubSubMessageReceiver messageReceiver) {
        ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of((String)config.getProjectId(), (String)config.getSubscription());
        Subscriber.Builder subscriberBuilder = Subscriber.newBuilder((ProjectSubscriptionName)subscriptionName, (MessageReceiver)messageReceiver);
        PubSubManager.buildCredentialsProvider(config).ifPresent(arg_0 -> ((Subscriber.Builder)subscriberBuilder).setCredentialsProvider(arg_0));
        this.buildTransportChannelProvider(config).ifPresent(arg_0 -> ((Subscriber.Builder)subscriberBuilder).setChannelProvider(arg_0));
        return subscriberBuilder.build();
    }

    private Optional<TransportChannelProvider> buildTransportChannelProvider(PubSubConfig config) {
        if (config.isMockPubSubTopics()) {
            return Optional.of(FixedTransportChannelProvider.create((TransportChannel)GrpcTransportChannel.create((ManagedChannel)this.buildChannel(config))));
        }
        return Optional.empty();
    }

    private static Optional<CredentialsProvider> buildCredentialsProvider(PubSubConfig config) {
        if (config.isMockPubSubTopics()) {
            return Optional.of(NoCredentialsProvider.create());
        }
        if (config.getCredentialPath() != null) {
            try {
                return Optional.of(FixedCredentialsProvider.create((Credentials)ServiceAccountCredentials.fromStream((InputStream)Files.newInputStream(config.getCredentialPath(), new OpenOption[0]))));
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
        }
        return Optional.empty();
    }

    private ManagedChannel buildChannel(PubSubConfig config) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress((String)config.getHost(), (int)config.getPort()).usePlaintext().build();
        this.channels.add(channel);
        return channel;
    }

    private static void shutdown(BackgroundResource resource) {
        try {
            resource.shutdown();
            resource.awaitTermination(2L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

