/*
 * Decompiled with CFR 0.152.
 */
package edu.stanford.protege.webprotege.ipc.pulsar;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.stanford.protege.webprotege.common.ProjectRequest;
import edu.stanford.protege.webprotege.common.Request;
import edu.stanford.protege.webprotege.common.Response;
import edu.stanford.protege.webprotege.ipc.CommandExecutionException;
import edu.stanford.protege.webprotege.ipc.CommandExecutor;
import edu.stanford.protege.webprotege.ipc.ExecutionContext;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PreDestroy;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

public class PulsarCommandExecutor<Q extends Request<R>, R extends Response>
implements CommandExecutor<Q, R> {
    private static final Logger logger = LoggerFactory.getLogger(PulsarCommandExecutor.class);
    private final Class<R> responseClass;
    @Value(value="${spring.application.name}")
    private String applicationName;
    @Autowired
    private PulsarClient pulsarClient;
    @Autowired
    private PulsarAdmin pulsarAdmin;
    @Autowired
    private ObjectMapper objectMapper;
    private Producer<byte[]> producer;
    private Consumer<byte[]> consumer;
    private String requestChannel = null;
    private String replyChannel = null;
    private final Map<String, CompletableFuture<R>> replyHandlers = new ConcurrentHashMap<String, CompletableFuture<R>>();
    @Value(value="${webprotege.pulsar.tenant}")
    private String tenant;
    private String replySubscriptionName;

    public PulsarCommandExecutor(Class<R> responseClass) {
        this.responseClass = responseClass;
    }

    @Override
    public CompletableFuture<R> execute(Q request, ExecutionContext executionContext) {
        try {
            String replyChannel = this.getReplyChannelName(request);
            byte[] json = this.objectMapper.writeValueAsBytes(request);
            try {
                Producer<byte[]> producer = this.getProducer(request);
                String correlationId = UUID.randomUUID().toString();
                CompletableFuture replyFuture = new CompletableFuture();
                this.replyHandlers.put(correlationId, replyFuture);
                TypedMessageBuilder messageBuilder = producer.newMessage().value((Object)json).property("webprotege_correlationId", correlationId).property("webprotege_replyChannel", replyChannel).property("webprotege_userId", executionContext.userId().value());
                if (request instanceof ProjectRequest) {
                    String projectId = ((ProjectRequest)request).projectId().id();
                    messageBuilder.property("webprotege_projectId", projectId);
                    messageBuilder.key(projectId);
                }
                messageBuilder.send();
                return replyFuture;
            }
            catch (PulsarClientException e) {
                e.printStackTrace();
                return new CompletableFuture();
            }
        }
        catch (JsonProcessingException e) {
            logger.error("JSON Processing Exception");
            throw new UncheckedIOException((IOException)((Object)e));
        }
    }

    private synchronized Producer<byte[]> getProducer(Q request) {
        try {
            if (this.producer != null) {
                return this.producer;
            }
            this.ensureConsumerIsListeningForRepliesToRequest(request);
            if (this.requestChannel == null) {
                this.requestChannel = request.getChannel();
            }
            if (!this.requestChannel.equals(request.getChannel())) {
                throw new RuntimeException("Request channel is not the request channel that is in use by this CommandExecutor");
            }
            String topicName = "persistent://" + this.tenant + "/command-requests/" + this.requestChannel;
            String producerName = this.applicationName + "--CommandExecutor--" + request.getChannel();
            this.producer = this.pulsarClient.newProducer().producerName(producerName).topic(topicName).accessMode(ProducerAccessMode.Shared).create();
            return this.producer;
        }
        catch (PulsarClientException e) {
            throw new UncheckedIOException((IOException)((Object)e));
        }
    }

    private void ensureConsumerIsListeningForRepliesToRequest(Q request) {
        try {
            if (this.consumer != null) {
                return;
            }
            if (this.replyChannel == null) {
                this.replyChannel = this.getReplyChannelName(request);
            }
            if (!this.replyChannel.equals(this.getReplyChannelName(request))) {
                throw new RuntimeException("Reply channel is not the channel that is in use by this command executor");
            }
            String replyTopic = "persistent://" + this.tenant + "/command-responses/" + this.replyChannel;
            this.replySubscriptionName = this.applicationName + "--" + this.replyChannel + "--" + UUID.randomUUID();
            logger.info("Setting up consumer with subscription {} to listen for replies at {}", (Object)this.replySubscriptionName, (Object)replyTopic);
            this.consumer = this.pulsarClient.newConsumer().subscriptionName(this.replySubscriptionName).subscriptionType(SubscriptionType.Exclusive).topic(new String[]{replyTopic}).messageListener(this::handleReplyMessageReceived).subscribe();
        }
        catch (PulsarClientException e) {
            throw new UncheckedIOException((IOException)((Object)e));
        }
    }

    private void handleReplyMessageReceived(Consumer<byte[]> consumer, Message<byte[]> msg) {
        try {
            String correlationId = msg.getProperty("webprotege_correlationId");
            if (correlationId == null) {
                logger.info("CorrelationId in reply message is missing.  Cannot handle reply.  Ignoring reply.");
                return;
            }
            String error = msg.getProperty("webprotege_error");
            if (error != null) {
                CommandExecutionException executionException = (CommandExecutionException)this.objectMapper.readValue(error, CommandExecutionException.class);
                CompletableFuture<R> replyHandler = this.replyHandlers.remove(correlationId);
                replyHandler.completeExceptionally(executionException);
                consumer.acknowledge(msg);
            } else {
                CompletableFuture<R> replyHandler = this.replyHandlers.remove(correlationId);
                Response response = (Response)this.objectMapper.readValue(msg.getData(), this.responseClass);
                consumer.acknowledge(msg);
                replyHandler.complete(response);
            }
        }
        catch (PulsarClientException e) {
            logger.error("Encountered Pulsar Client Exception", (Throwable)e);
            throw new UncheckedIOException((IOException)((Object)e));
        }
        catch (IOException e) {
            logger.error("Cannot deserialize reply message on topic {}", (Object)consumer.getTopic(), (Object)e);
            consumer.negativeAcknowledge(msg);
        }
    }

    @PreDestroy
    public void close() {
        if (this.consumer != null) {
            logger.info("Closing consumer listening to {}", (Object)this.consumer.getConsumerName());
            this.consumer.unsubscribeAsync();
            this.consumer.closeAsync();
        }
        if (this.producer != null) {
            logger.info("Closing producer {}", (Object)this.producer.getProducerName());
            this.producer.closeAsync();
        }
    }

    private String getReplyChannelName(Q request) {
        return request.getChannel() + "--replies";
    }
}

