/*
 * Decompiled with CFR 0.152.
 */
package edu.stanford.protege.webprotege.ipc.pulsar;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.stanford.protege.webprotege.authorization.ActionId;
import edu.stanford.protege.webprotege.authorization.AuthorizationStatus;
import edu.stanford.protege.webprotege.authorization.GetAuthorizationStatusRequest;
import edu.stanford.protege.webprotege.authorization.GetAuthorizationStatusResponse;
import edu.stanford.protege.webprotege.authorization.Resource;
import edu.stanford.protege.webprotege.authorization.Subject;
import edu.stanford.protege.webprotege.common.Request;
import edu.stanford.protege.webprotege.common.Response;
import edu.stanford.protege.webprotege.common.UserId;
import edu.stanford.protege.webprotege.ipc.AuthorizedCommandHandler;
import edu.stanford.protege.webprotege.ipc.CommandExecutionException;
import edu.stanford.protege.webprotege.ipc.CommandExecutor;
import edu.stanford.protege.webprotege.ipc.CommandHandler;
import edu.stanford.protege.webprotege.ipc.ExecutionContext;
import edu.stanford.protege.webprotege.ipc.pulsar.PulsarProducersManager;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import reactor.core.publisher.Mono;

public class PulsarCommandHandlerWrapper<Q extends Request<R>, R extends Response> {
    private static final Logger logger = LoggerFactory.getLogger(PulsarCommandHandlerWrapper.class);
    private final String applicationName;
    private final String tenant;
    private final PulsarClient pulsarClient;
    private final CommandHandler<Q, R> handler;
    private final ObjectMapper objectMapper;
    private final PulsarProducersManager producersManager;
    private final CommandExecutor<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> authorizationStatusExecutor;
    private Consumer<byte[]> consumer;

    public PulsarCommandHandlerWrapper(String applicationName, @Value(value="webprotege.pulsar.tenant") String tenant, PulsarClient pulsarClient, CommandHandler<Q, R> handler, ObjectMapper objectMapper, PulsarProducersManager producersManager, CommandExecutor<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> authorizationStatusExecutor) {
        this.applicationName = applicationName;
        this.tenant = tenant;
        this.pulsarClient = pulsarClient;
        this.handler = handler;
        this.objectMapper = objectMapper;
        this.producersManager = producersManager;
        this.authorizationStatusExecutor = authorizationStatusExecutor;
    }

    public void unsubscribe() {
        try {
            this.consumer.unsubscribe();
        }
        catch (PulsarClientException e) {
            logger.warn("An exception was thrown when unsubscribing", (Throwable)e);
        }
    }

    public void subscribe() {
        try {
            this.consumer = this.pulsarClient.newConsumer().topic(new String[]{this.getRequestsTopicUrl(this.handler)}).subscriptionName(this.getSubscriptionName(this.handler)).messageListener(this::handleCommandMessage).subscribe();
        }
        catch (PulsarClientException e) {
            throw new UncheckedIOException((IOException)((Object)e));
        }
    }

    private void handleCommandMessage(Consumer<byte[]> consumer, Message<byte[]> message) {
        String replyChannel = message.getProperty("webprotege_replyChannel");
        if (replyChannel == null) {
            logger.error("webprotege_replyChannel header is missing.  Cannot reply to message.");
            consumer.acknowledgeAsync(message);
            return;
        }
        String correlationId = message.getProperty("webprotege_correlationId");
        if (correlationId == null) {
            logger.error("webprotege_correlationId header is missing.  Cannot process message.");
            consumer.acknowledgeAsync(message);
            return;
        }
        String userId = message.getProperty("webprotege_userId");
        if (userId == null) {
            logger.error("webprotege_userId header is missing.  Cannot process message.  Returning Forbidden Error Code.  Message reply topic: {}", (Object)replyChannel);
            this.replyWithErrorResponse(replyChannel, correlationId, "", HttpStatus.FORBIDDEN);
            consumer.acknowledgeAsync(message);
            return;
        }
        this.parseAndHandleRequest(consumer, message, replyChannel, correlationId, userId);
    }

    private void parseAndHandleRequest(Consumer<byte[]> consumer, Message<byte[]> message, String replyChannel, String correlationId, String userId) {
        try {
            byte[] payload = message.getData();
            Request request = (Request)this.objectMapper.readValue(payload, this.handler.getRequestClass());
            consumer.acknowledgeAsync(message);
            CommandHandler<Q, R> commandHandler = this.handler;
            if (commandHandler instanceof AuthorizedCommandHandler) {
                AuthorizedCommandHandler authorizedCommandHandler = (AuthorizedCommandHandler)commandHandler;
                this.authorizeAndReplyToRequest(replyChannel, correlationId, userId, request, authorizedCommandHandler);
            } else {
                this.handleAndReplyToRequest(replyChannel, correlationId, userId, request);
            }
        }
        catch (IOException e) {
            logger.error("Could not parse request", (Throwable)e);
            consumer.negativeAcknowledge(message);
            this.replyWithErrorResponse(replyChannel, correlationId, userId, HttpStatus.BAD_REQUEST);
        }
    }

    private void authorizeAndReplyToRequest(String replyChannel, String correlationId, String userId, Q request, AuthorizedCommandHandler<Q, R> authenticatingCommandHandler) {
        Resource resource = authenticatingCommandHandler.getTargetResource(request);
        Subject subject = Subject.forUser((String)userId);
        Collection<ActionId> requiredActionId = authenticatingCommandHandler.getRequiredCapabilities();
        GetAuthorizationStatusRequest authRequest = new GetAuthorizationStatusRequest(resource, subject, (ActionId)requiredActionId.stream().findFirst().orElse(null));
        ExecutionContext executionContext = new ExecutionContext(new UserId(userId), "");
        CompletableFuture<GetAuthorizationStatusResponse> authResponseFuture = this.authorizationStatusExecutor.execute(authRequest, executionContext);
        authResponseFuture.whenComplete((authResponse, authError) -> {
            if (authError != null) {
                logger.warn("An error occurred when requesting the authorization status for {} on {}. Error: {}", new Object[]{userId, resource, authError.getMessage()});
                this.replyWithErrorResponse(replyChannel, correlationId, userId, HttpStatus.INTERNAL_SERVER_ERROR);
            } else if (authResponse.authorizationStatus() == AuthorizationStatus.AUTHORIZED) {
                this.handleAndReplyToRequest(replyChannel, correlationId, userId, request);
            } else {
                logger.info("Permission denied when attempting to execute a request.  User: {}, Request: {}", (Object)userId, (Object)request);
                this.replyWithErrorResponse(replyChannel, correlationId, userId, HttpStatus.FORBIDDEN);
            }
        });
    }

    private void handleAndReplyToRequest(String replyChannel, String correlationId, String userId, Q request) {
        ExecutionContext executionContext = new ExecutionContext(new UserId(userId), "");
        try {
            Mono<R> response = this.handler.handleRequest(request, executionContext);
            response.subscribe(r -> {
                this.replyWithSuccessResponse(replyChannel, correlationId, userId, r);
                logger.info("Sent reply to {}", (Object)replyChannel);
            }, throwable -> {
                if (throwable instanceof CommandExecutionException) {
                    CommandExecutionException ex = (CommandExecutionException)throwable;
                    logger.info("The command handler threw a CommandExecutionException exception while handling a request.  Sending an error as the reply to {}.  Code: {}, Message: {},  Request: {}", new Object[]{replyChannel, ex.getStatusCode(), throwable.getMessage(), request});
                    this.replyWithErrorResponse(replyChannel, correlationId, userId, ex.getStatus());
                } else {
                    this.replyWithInternalServerError(replyChannel, correlationId, userId, request, (Throwable)throwable);
                }
            });
        }
        catch (Throwable throwable2) {
            logger.error("Uncaught exception when handling request", throwable2);
            this.replyWithInternalServerError(replyChannel, correlationId, userId, request, throwable2);
        }
    }

    private void replyWithInternalServerError(String replyChannel, String correlationId, String userId, Q request, Throwable throwable) {
        logger.info("The command handler threw an exception while handling a request.  Sending an error as the reply to {}.  Exception class: {}, Message: {},  Request: {}", new Object[]{replyChannel, throwable.getClass().getName(), throwable.getMessage(), request});
        this.replyWithErrorResponse(replyChannel, correlationId, userId, HttpStatus.INTERNAL_SERVER_ERROR);
    }

    private void replyWithErrorResponse(String replyChannel, String correlationId, String userId, HttpStatus status) {
        String replyTopicUrl = this.getReplyTopicUrl(replyChannel);
        Producer<byte[]> replyProducer = this.producersManager.getProducer(replyTopicUrl);
        CommandExecutionException executionException = new CommandExecutionException(status);
        String value = this.serializeCommandExecutionException(executionException);
        replyProducer.newMessage().property("webprotege_correlationId", correlationId).property("webprotege_userId", userId).property("webprotege_error", value).sendAsync();
    }

    private void replyWithSuccessResponse(String replyChannel, String correlationId, String userId, R response) {
        try {
            String topicUrl = this.getReplyTopicUrl(replyChannel);
            Producer<byte[]> producer = this.producersManager.getProducer(topicUrl);
            byte[] value = this.objectMapper.writeValueAsBytes(response);
            producer.newMessage().property("webprotege_correlationId", correlationId).property("webprotege_userId", userId).value((Object)value).sendAsync();
        }
        catch (JsonProcessingException e) {
            this.replyWithErrorResponse(replyChannel, correlationId, userId, HttpStatus.INTERNAL_SERVER_ERROR);
        }
    }

    private String getReplyTopicUrl(String replyChannel) {
        return this.tenant + "/command-responses/" + replyChannel;
    }

    private String getSubscriptionName(CommandHandler<?, ?> handler) {
        return this.applicationName + "--" + handler.getChannelName() + "--handler";
    }

    private String getConsumerName(CommandHandler<?, ?> handler) {
        return this.applicationName + "--" + handler.getChannelName() + "--handler";
    }

    private String getRequestsTopicUrl(CommandHandler<?, ?> handler) {
        String channelName = handler.getChannelName();
        return this.tenant + "/command-requests/" + channelName;
    }

    private String serializeCommandExecutionException(CommandExecutionException exception) {
        try {
            return this.objectMapper.writeValueAsString((Object)exception);
        }
        catch (JsonProcessingException e) {
            logger.error("Error while serializing CommandExecutionException", (Throwable)e);
            return "{\n    \"statusCode\" : 500\n}\n".strip();
        }
    }
}

