/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.command;

import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.command.Command;
import io.axoniq.axonserver.grpc.command.CommandProviderInbound;
import io.axoniq.axonserver.grpc.command.CommandProviderOutbound;
import io.axoniq.axonserver.grpc.command.CommandResponse;
import io.axoniq.axonserver.grpc.command.CommandServiceGrpc;
import io.axoniq.axonserver.grpc.command.CommandSubscription;
import io.grpc.ClientInterceptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Comparator;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.DispatchInterceptors;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.command.AxonServerCommandDispatchException;
import org.axonframework.axonserver.connector.command.AxonServerRegistration;
import org.axonframework.axonserver.connector.command.AxonServerRemoteCommandHandlingException;
import org.axonframework.axonserver.connector.command.CommandPriorityCalculator;
import org.axonframework.axonserver.connector.command.CommandSerializer;
import org.axonframework.axonserver.connector.util.ContextAddingInterceptor;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.axonserver.connector.util.FlowControllingStreamObserver;
import org.axonframework.axonserver.connector.util.ProcessingInstructionHelper;
import org.axonframework.axonserver.connector.util.TokenAddingInterceptor;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.GenericCommandResultMessage;
import org.axonframework.commandhandling.distributed.RoutingStrategy;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerCommandBus
implements CommandBus {
    private final CommandBus localSegment;
    private final CommandRouterSubscriber commandRouterSubscriber;
    private final AxonServerConnectionManager axonServerConnectionManager;
    private final RoutingStrategy routingStrategy;
    private final CommandPriorityCalculator priorityCalculator;
    private final CommandSerializer serializer;
    private final AxonServerConfiguration configuration;
    private final ClientInterceptor[] interceptors;
    private final DispatchInterceptors<CommandMessage<?>> dispatchInterceptors = new DispatchInterceptors();
    private Logger logger = LoggerFactory.getLogger(AxonServerCommandBus.class);

    public AxonServerCommandBus(AxonServerConnectionManager axonServerConnectionManager, AxonServerConfiguration configuration, CommandBus localSegment, Serializer serializer, RoutingStrategy routingStrategy) {
        this(axonServerConnectionManager, configuration, localSegment, serializer, routingStrategy, new CommandPriorityCalculator(){});
    }

    public AxonServerCommandBus(AxonServerConnectionManager axonServerConnectionManager, AxonServerConfiguration configuration, CommandBus localSegment, Serializer serializer, RoutingStrategy routingStrategy, CommandPriorityCalculator priorityCalculator) {
        this.localSegment = localSegment;
        this.serializer = new CommandSerializer(serializer, configuration);
        this.axonServerConnectionManager = axonServerConnectionManager;
        this.routingStrategy = routingStrategy;
        this.priorityCalculator = priorityCalculator;
        this.configuration = configuration;
        this.commandRouterSubscriber = new CommandRouterSubscriber();
        this.interceptors = new ClientInterceptor[]{new TokenAddingInterceptor(configuration.getToken()), new ContextAddingInterceptor(configuration.getContext())};
    }

    public <C> void dispatch(CommandMessage<C> command) {
        this.dispatch(command, (commandMessage, commandResultMessage) -> {});
    }

    public <C, R> void dispatch(CommandMessage<C> commandMessage, final CommandCallback<? super C, ? super R> commandCallback) {
        this.logger.debug("Dispatch with callback: {}", (Object)commandMessage.getCommandName());
        final CommandMessage<C> command = this.dispatchInterceptors.intercept(commandMessage);
        final AtomicBoolean serverResponded = new AtomicBoolean(false);
        try {
            ((CommandServiceGrpc.CommandServiceStub)CommandServiceGrpc.newStub(this.axonServerConnectionManager.getChannel()).withInterceptors(this.interceptors)).dispatch(this.serializer.serialize(command, this.routingStrategy.getRoutingKey(command), this.priorityCalculator.determinePriority(command)), new StreamObserver<CommandResponse>(){

                public void onNext(CommandResponse commandResponse) {
                    serverResponded.set(true);
                    if (!commandResponse.hasErrorMessage()) {
                        AxonServerCommandBus.this.logger.debug("response received - {}", (Object)commandResponse);
                        try {
                            GenericCommandResultMessage resultMessage = AxonServerCommandBus.this.serializer.deserialize(commandResponse);
                            commandCallback.onResult(command, resultMessage);
                        }
                        catch (Exception ex) {
                            commandCallback.onResult(command, GenericCommandResultMessage.asCommandResultMessage((Throwable)ex));
                            AxonServerCommandBus.this.logger.info("Failed to deserialize payload - {} - {}", (Object)commandResponse.getPayload().getData(), (Object)ex.getCause().getMessage());
                        }
                    } else {
                        commandCallback.onResult(command, GenericCommandResultMessage.asCommandResultMessage((Throwable)((Object)new AxonServerRemoteCommandHandlingException(commandResponse.getErrorCode(), commandResponse.getErrorMessage()))));
                    }
                }

                public void onError(Throwable throwable) {
                    serverResponded.set(true);
                    commandCallback.onResult(command, GenericCommandResultMessage.asCommandResultMessage((Throwable)((Object)new AxonServerCommandDispatchException(ErrorCode.COMMAND_DISPATCH_ERROR.errorCode(), ExceptionSerializer.serialize(AxonServerCommandBus.this.configuration.getClientId(), throwable)))));
                }

                public void onCompleted() {
                    if (!serverResponded.get()) {
                        commandCallback.onResult(command, GenericCommandResultMessage.asCommandResultMessage((Throwable)((Object)new AxonServerCommandDispatchException(ErrorCode.COMMAND_DISPATCH_ERROR.errorCode(), ErrorMessage.newBuilder().setMessage("No result from command executor").build()))));
                    }
                }
            });
        }
        catch (Exception e) {
            this.logger.warn("There was a problem dispatching a command {}.", command, (Object)e);
            commandCallback.onResult(command, GenericCommandResultMessage.asCommandResultMessage((Throwable)((Object)new AxonServerCommandDispatchException(ErrorCode.COMMAND_DISPATCH_ERROR.errorCode(), ExceptionSerializer.serialize(this.configuration.getClientId(), e)))));
        }
    }

    public Registration subscribe(String s, MessageHandler<? super CommandMessage<?>> messageHandler) {
        this.logger.debug("Subscribe: {}", (Object)s);
        this.commandRouterSubscriber.subscribe(s);
        return new AxonServerRegistration(this.localSegment.subscribe(s, messageHandler), () -> this.commandRouterSubscriber.unsubscribe(s));
    }

    public Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super CommandMessage<?>> handlerInterceptor) {
        return this.localSegment.registerHandlerInterceptor(handlerInterceptor);
    }

    public void disconnect() {
        this.commandRouterSubscriber.disconnect();
    }

    public Registration registerDispatchInterceptor(MessageDispatchInterceptor<? super CommandMessage<?>> dispatchInterceptor) {
        return this.dispatchInterceptors.registerDispatchInterceptor(dispatchInterceptor);
    }

    protected class CommandRouterSubscriber {
        private final CopyOnWriteArraySet<String> subscribedCommands = new CopyOnWriteArraySet();
        private final PriorityBlockingQueue<Command> commandQueue;
        private final ExecutorService executor = Executors.newFixedThreadPool(AxonServerCommandBus.access$200(AxonServerCommandBus.this).getCommandThreads(), (ThreadFactory)new AxonThreadFactory("AxonServerCommandReceiver"));
        private volatile boolean subscribing;
        private volatile boolean running = true;
        private volatile StreamObserver<CommandProviderOutbound> subscriberStreamObserver;

        CommandRouterSubscriber() {
            AxonServerCommandBus.this.axonServerConnectionManager.addReconnectListener(this::resubscribe);
            AxonServerCommandBus.this.axonServerConnectionManager.addDisconnectListener(this::unsubscribeAll);
            this.commandQueue = new PriorityBlockingQueue<Command>(1000, Comparator.comparingLong(c -> -ProcessingInstructionHelper.priority(c.getProcessingInstructionsList())));
            IntStream.range(0, AxonServerCommandBus.this.configuration.getCommandThreads()).forEach(i -> this.executor.submit(this::commandExecutor));
        }

        private void commandExecutor() {
            AxonServerCommandBus.this.logger.debug("Starting command Executor");
            boolean interrupted = false;
            while (!interrupted && this.running) {
                try {
                    Command command = this.commandQueue.poll(1L, TimeUnit.SECONDS);
                    if (command == null) continue;
                    AxonServerCommandBus.this.logger.debug("Received command: {}", (Object)command);
                    this.processCommand(command);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    AxonServerCommandBus.this.logger.warn("Interrupted queryExecutor", (Throwable)e);
                    interrupted = true;
                }
            }
        }

        private void resubscribe() {
            if (this.subscribedCommands.isEmpty() || this.subscribing) {
                return;
            }
            try {
                StreamObserver<CommandProviderOutbound> outboundStreamObserver = this.getSubscriberObserver();
                this.subscribedCommands.forEach(command -> outboundStreamObserver.onNext((Object)CommandProviderOutbound.newBuilder().setSubscribe(CommandSubscription.newBuilder().setCommand((String)command).setComponentName(AxonServerCommandBus.this.configuration.getComponentName()).setClientId(AxonServerCommandBus.this.configuration.getClientId()).setMessageId(UUID.randomUUID().toString()).build()).build()));
            }
            catch (Exception ex) {
                AxonServerCommandBus.this.logger.warn("Error while resubscribing - {}", (Object)ex.getMessage());
            }
        }

        public void subscribe(String command) {
            this.subscribing = true;
            this.subscribedCommands.add(command);
            try {
                StreamObserver<CommandProviderOutbound> outboundStreamObserver = this.getSubscriberObserver();
                outboundStreamObserver.onNext((Object)CommandProviderOutbound.newBuilder().setSubscribe(CommandSubscription.newBuilder().setCommand(command).setClientId(AxonServerCommandBus.this.configuration.getClientId()).setComponentName(AxonServerCommandBus.this.configuration.getComponentName()).setMessageId(UUID.randomUUID().toString()).build()).build());
            }
            catch (Exception sre) {
                AxonServerCommandBus.this.logger.debug("Subscribing command {} with AxonServer failed. Will resubscribe when connection is established.", (Object)command, (Object)sre);
            }
            finally {
                this.subscribing = false;
            }
        }

        private void processCommand(Command command) {
            StreamObserver<CommandProviderOutbound> outboundStreamObserver = this.getSubscriberObserver();
            try {
                this.dispatchLocal(AxonServerCommandBus.this.serializer.deserialize(command), outboundStreamObserver);
            }
            catch (RuntimeException throwable) {
                AxonServerCommandBus.this.logger.error("Error while dispatching command {} - {}", new Object[]{command.getName(), throwable.getMessage(), throwable});
                CommandProviderOutbound response = CommandProviderOutbound.newBuilder().setCommandResponse(CommandResponse.newBuilder().setMessageIdentifier(UUID.randomUUID().toString()).setRequestIdentifier(command.getMessageIdentifier()).setErrorCode(ErrorCode.COMMAND_DISPATCH_ERROR.errorCode()).setErrorMessage(ExceptionSerializer.serialize(AxonServerCommandBus.this.configuration.getClientId(), throwable))).build();
                outboundStreamObserver.onNext((Object)response);
            }
        }

        private synchronized StreamObserver<CommandProviderOutbound> getSubscriberObserver() {
            if (this.subscriberStreamObserver == null) {
                StreamObserver<CommandProviderInbound> commandsFromRoutingServer = new StreamObserver<CommandProviderInbound>(){

                    public void onNext(CommandProviderInbound commandToSubscriber) {
                        AxonServerCommandBus.this.logger.debug("Received from server: {}", (Object)commandToSubscriber);
                        switch (commandToSubscriber.getRequestCase()) {
                            case COMMAND: {
                                CommandRouterSubscriber.this.commandQueue.add(commandToSubscriber.getCommand());
                            }
                        }
                    }

                    public void onError(Throwable ex) {
                        AxonServerCommandBus.this.logger.warn("Received error from server: {}", (Object)ex.getMessage());
                        CommandRouterSubscriber.this.subscriberStreamObserver = null;
                        if (ex instanceof StatusRuntimeException && ((StatusRuntimeException)ex).getStatus().getCode().equals((Object)Status.UNAVAILABLE.getCode())) {
                            return;
                        }
                        CommandRouterSubscriber.this.resubscribe();
                    }

                    public void onCompleted() {
                        AxonServerCommandBus.this.logger.debug("Received completed from server");
                        CommandRouterSubscriber.this.subscriberStreamObserver = null;
                    }
                };
                StreamObserver<CommandProviderOutbound> stream = AxonServerCommandBus.this.axonServerConnectionManager.getCommandStream(commandsFromRoutingServer, AxonServerCommandBus.this.interceptors);
                AxonServerCommandBus.this.logger.info("Creating new subscriber");
                this.subscriberStreamObserver = new FlowControllingStreamObserver<CommandProviderOutbound>(stream, AxonServerCommandBus.this.configuration, flowControl -> CommandProviderOutbound.newBuilder().setFlowControl((FlowControl)flowControl).build(), t -> t.getRequestCase().equals((Object)CommandProviderOutbound.RequestCase.COMMAND_RESPONSE)).sendInitialPermits();
            }
            return this.subscriberStreamObserver;
        }

        public void unsubscribe(String command) {
            this.subscribedCommands.remove(command);
            try {
                this.getSubscriberObserver().onNext((Object)CommandProviderOutbound.newBuilder().setUnsubscribe(CommandSubscription.newBuilder().setCommand(command).setClientId(AxonServerCommandBus.this.configuration.getClientId()).setMessageId(UUID.randomUUID().toString()).build()).build());
            }
            catch (Exception exception) {
                // empty catch block
            }
        }

        void unsubscribeAll() {
            for (String command : this.subscribedCommands) {
                try {
                    this.getSubscriberObserver().onNext((Object)CommandProviderOutbound.newBuilder().setUnsubscribe(CommandSubscription.newBuilder().setCommand(command).setClientId(AxonServerCommandBus.this.configuration.getClientId()).setMessageId(UUID.randomUUID().toString()).build()).build());
                }
                catch (Exception exception) {}
            }
            this.subscriberStreamObserver = null;
        }

        private <C> void dispatchLocal(CommandMessage<C> command, StreamObserver<CommandProviderOutbound> responseObserver) {
            AxonServerCommandBus.this.logger.debug("DispatchLocal: {}", (Object)command.getCommandName());
            AxonServerCommandBus.this.localSegment.dispatch(command, (commandMessage, commandResultMessage) -> {
                if (commandResultMessage.isExceptional()) {
                    Throwable throwable = commandResultMessage.exceptionResult();
                    CommandProviderOutbound response = CommandProviderOutbound.newBuilder().setCommandResponse(CommandResponse.newBuilder().setMessageIdentifier(UUID.randomUUID().toString()).setRequestIdentifier(command.getIdentifier()).setErrorCode(ErrorCode.COMMAND_EXECUTION_ERROR.errorCode()).setErrorMessage(ExceptionSerializer.serialize(AxonServerCommandBus.this.configuration.getClientId(), throwable))).build();
                    responseObserver.onNext((Object)response);
                    AxonServerCommandBus.this.logger.info("DispatchLocal: failure {} - {}", new Object[]{command.getCommandName(), throwable.getMessage(), throwable});
                } else {
                    AxonServerCommandBus.this.logger.debug("DispatchLocal: done {}", (Object)command.getCommandName());
                    responseObserver.onNext((Object)AxonServerCommandBus.this.serializer.serialize(commandResultMessage, command.getIdentifier()));
                }
            });
        }

        public void disconnect() {
            if (this.subscriberStreamObserver != null) {
                this.subscriberStreamObserver.onCompleted();
            }
            this.running = false;
            this.executor.shutdown();
        }
    }
}

