/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.gcp.pubsub;

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.TopicName;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.gcp.pubsub.PubSubConfig;
import io.smallrye.reactive.messaging.gcp.pubsub.PubSubManager;
import io.smallrye.reactive.messaging.gcp.pubsub.PubSubMessage;
import io.smallrye.reactive.messaging.gcp.pubsub.PubSubSource;
import io.smallrye.reactive.messaging.gcp.pubsub.i18n.PubSubLogging;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.Destroyed;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.io.File;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

@ApplicationScoped
@Connector(value="smallrye-gcp-pubsub")
public class PubSubConnector
implements InboundConnector,
OutboundConnector {
    static final String CONNECTOR_NAME = "smallrye-gcp-pubsub";
    @Inject
    @ConfigProperty(name="gcp-pubsub-project-id")
    private String projectId;
    @Inject
    @ConfigProperty(name="mock-pubsub-topics", defaultValue="false")
    private boolean mockPubSubTopics;
    @Inject
    @ConfigProperty(name="mock-pubsub-host")
    private Optional<String> host;
    @Inject
    @ConfigProperty(name="mock-pubsub-port")
    private Optional<Integer> port;
    @Inject
    private PubSubManager pubSubManager;
    private ExecutorService executorService;

    @PostConstruct
    public void initialize() {
        this.executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    }

    public void destroy(@Observes @Destroyed(value=ApplicationScoped.class) Object context) {
        try {
            this.executorService.shutdown();
            this.executorService.awaitTermination(2L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
        PubSubConfig pubSubConfig = new PubSubConfig(this.getProjectId(config), PubSubConnector.getTopic(config), PubSubConnector.getCredentialPath(config), PubSubConnector.getSubscription(config), this.mockPubSubTopics, this.host.orElse(null), this.port.orElse(null));
        return Multi.createFrom().uni(Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> {
            if (this.isUseAdminClient(config)) {
                PubSubLogging.log.adminClientEnabled();
                this.createTopic(pubSubConfig);
                this.createSubscription(pubSubConfig);
            }
            return pubSubConfig;
        }, this.executorService))).onItem().transformToMultiAndConcatenate(cfg -> Multi.createFrom().emitter((Consumer)new PubSubSource((PubSubConfig)cfg, this.pubSubManager)));
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
        PubSubConfig pubSubConfig = new PubSubConfig(this.getProjectId(config), PubSubConnector.getTopic(config), PubSubConnector.getCredentialPath(config), this.mockPubSubTopics, this.host.orElse(null), this.port.orElse(null));
        return MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(message -> Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> {
            if (this.isUseAdminClient(config)) {
                PubSubLogging.log.adminClientEnabled();
                this.createTopic(pubSubConfig);
            }
            return (String)PubSubConnector.await(this.pubSubManager.publisher(pubSubConfig).publish(PubSubConnector.buildMessage(message)));
        }, this.executorService))));
    }

    private String getProjectId(Config config) {
        return config.getOptionalValue("project-id", String.class).orElse(this.projectId);
    }

    boolean isUseAdminClient(Config config) {
        return config.getOptionalValue("use-admin-client", Boolean.class).orElse(true);
    }

    private void createTopic(PubSubConfig config) {
        TopicAdminClient topicAdminClient = this.pubSubManager.topicAdminClient(config);
        TopicName topicName = TopicName.of((String)config.getProjectId(), (String)config.getTopic());
        try {
            topicAdminClient.getTopic(topicName);
        }
        catch (NotFoundException nf) {
            try {
                topicAdminClient.createTopic(topicName);
            }
            catch (AlreadyExistsException ae) {
                PubSubLogging.log.topicExistAlready(topicName, ae);
            }
        }
    }

    private void createSubscription(PubSubConfig config) {
        SubscriptionAdminClient subscriptionAdminClient = this.pubSubManager.subscriptionAdminClient(config);
        ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of((String)config.getProjectId(), (String)config.getSubscription());
        try {
            subscriptionAdminClient.getSubscription(subscriptionName);
        }
        catch (NotFoundException e) {
            PushConfig pushConfig = PushConfig.newBuilder().build();
            TopicName topicName = TopicName.of((String)config.getProjectId(), (String)config.getTopic());
            subscriptionAdminClient.createSubscription(subscriptionName, topicName, pushConfig, 0);
        }
    }

    private static String getTopic(Config config) {
        String topic = config.getOptionalValue("topic", String.class).orElse(null);
        if (topic != null) {
            return topic;
        }
        return (String)config.getValue("channel-name", String.class);
    }

    private static String getSubscription(Config config) {
        return (String)config.getValue("subscription", String.class);
    }

    private static Path getCredentialPath(Config config) {
        return config.getOptionalValue("credential-path", String.class).map(File::new).map(File::toPath).orElse(null);
    }

    private static PubsubMessage buildMessage(Message<?> message) {
        if (message instanceof PubSubMessage) {
            return ((PubSubMessage)message).getMessage();
        }
        if (message.getPayload() instanceof PubSubMessage) {
            return ((PubSubMessage)message.getPayload()).getMessage();
        }
        if (message.getPayload() instanceof PubsubMessage) {
            return (PubsubMessage)message.getPayload();
        }
        return PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8((String)message.getPayload().toString())).build();
    }

    private static <T> T await(Future<T> future) {
        try {
            return future.get();
        }
        catch (ExecutionException e) {
            throw new IllegalStateException(e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }
    }
}

