/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.spring.integration.servicebus.queue;

import com.google.common.collect.Sets;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageSession;
import com.microsoft.azure.servicebus.IQueueClient;
import com.microsoft.azure.servicebus.ISessionHandler;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.spring.integration.core.AzureCheckpointer;
import com.microsoft.azure.spring.integration.core.api.CheckpointMode;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusClientConfig;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusMessageHandler;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusTemplate;
import com.microsoft.azure.spring.integration.servicebus.factory.ServiceBusQueueClientFactory;
import com.microsoft.azure.spring.integration.servicebus.queue.ServiceBusQueueOperation;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;

public class ServiceBusQueueTemplate
extends ServiceBusTemplate<ServiceBusQueueClientFactory>
implements ServiceBusQueueOperation {
    private static final Logger LOG = LoggerFactory.getLogger(ServiceBusQueueTemplate.class);
    private static final String MSG_FAIL_CHECKPOINT = "Failed to checkpoint %s in queue '%s'";
    private static final String MSG_SUCCESS_CHECKPOINT = "Checkpointed %s in queue '%s' in %s mode";
    private final Set<String> subscribedQueues = Sets.newConcurrentHashSet();

    public ServiceBusQueueTemplate(ServiceBusQueueClientFactory clientFactory) {
        super(clientFactory);
    }

    public boolean subscribe(String destination, @NonNull Consumer<Message<?>> consumer, @NonNull Class<?> targetPayloadClass) {
        Assert.hasText((String)destination, (String)"destination can't be null or empty");
        if (this.subscribedQueues.contains(destination)) {
            return false;
        }
        this.subscribedQueues.add(destination);
        this.internalSubscribe(destination, consumer, targetPayloadClass);
        return true;
    }

    public boolean unsubscribe(String destination) {
        return this.subscribedQueues.remove(destination);
    }

    protected void internalSubscribe(String name, Consumer<Message<?>> consumer, Class<?> payloadType) {
        IQueueClient queueClient = ((ServiceBusQueueClientFactory)this.senderFactory).getOrCreateClient(name);
        String threadPrefix = String.format("%s-handler", name);
        try {
            queueClient.setPrefetchCount(this.clientConfig.getPrefetchCount());
            if (this.clientConfig.isSessionsEnabled()) {
                queueClient.registerSessionHandler(new QueueMessageHandler(consumer, payloadType, queueClient), this.buildSessionHandlerOptions(), this.buildHandlerExecutors(threadPrefix));
            } else {
                queueClient.registerMessageHandler(new QueueMessageHandler(consumer, payloadType, queueClient), this.buildHandlerOptions(), this.buildHandlerExecutors(threadPrefix));
            }
        }
        catch (ServiceBusException | InterruptedException e) {
            LOG.error("Failed to register queue message handler", e);
            throw new ServiceBusRuntimeException("Failed to register queue message handler", e);
        }
    }

    @Override
    public void setClientConfig(@NonNull ServiceBusClientConfig clientConfig) {
        this.clientConfig = clientConfig;
    }

    protected class QueueMessageHandler<U>
    extends ServiceBusMessageHandler<U>
    implements ISessionHandler {
        private final IQueueClient queueClient;

        public QueueMessageHandler(Consumer<Message<U>> consumer, Class<U> payloadType, IQueueClient queueClient) {
            super(consumer, payloadType, ServiceBusQueueTemplate.this.getCheckpointConfig(), ServiceBusQueueTemplate.this.getMessageConverter());
            this.queueClient = queueClient;
        }

        @Override
        protected CompletableFuture<Void> success(UUID uuid) {
            return this.queueClient.completeAsync(uuid);
        }

        @Override
        protected CompletableFuture<Void> failure(UUID uuid) {
            return this.queueClient.abandonAsync(uuid);
        }

        @Override
        protected String buildCheckpointFailMessage(Message<?> message) {
            return String.format(ServiceBusQueueTemplate.MSG_FAIL_CHECKPOINT, message, this.queueClient.getQueueName());
        }

        @Override
        protected String buildCheckpointSuccessMessage(Message<?> message) {
            return String.format(ServiceBusQueueTemplate.MSG_SUCCESS_CHECKPOINT, message, this.queueClient.getQueueName(), ServiceBusQueueTemplate.this.getCheckpointConfig().getCheckpointMode());
        }

        public CompletableFuture<Void> onMessageAsync(IMessageSession session, IMessage serviceBusMessage) {
            HashMap<String, AzureCheckpointer> headers = new HashMap<String, AzureCheckpointer>();
            AzureCheckpointer checkpointer = new AzureCheckpointer(() -> session.completeAsync(serviceBusMessage.getLockToken()), () -> session.abandonAsync(serviceBusMessage.getLockToken()));
            if (this.checkpointConfig.getCheckpointMode() == CheckpointMode.MANUAL) {
                headers.put("azure_checkpointer", checkpointer);
            }
            Message message = this.messageConverter.toMessage(serviceBusMessage, (Map)new MessageHeaders(headers), this.payloadType);
            this.consumer.accept(message);
            if (this.checkpointConfig.getCheckpointMode() == CheckpointMode.RECORD) {
                return checkpointer.success().whenComplete((v, t) -> super.checkpointHandler((Message<?>)message, (Throwable)t));
            }
            return CompletableFuture.completedFuture(null);
        }

        public CompletableFuture<Void> OnCloseSessionAsync(IMessageSession session) {
            LOG.info("Closed session '" + session.getSessionId() + "' for subscription: " + session.getEntityPath());
            return CompletableFuture.completedFuture(null);
        }
    }
}

