/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.storage.queue.core;

import com.azure.spring.messaging.PartitionSupplier;
import com.azure.spring.messaging.checkpoint.AzureCheckpointer;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import com.azure.spring.storage.queue.core.StorageQueueClientFactory;
import com.azure.spring.storage.queue.core.StorageQueueOperation;
import com.azure.spring.storage.queue.core.StorageQueueRuntimeException;
import com.azure.spring.storage.queue.support.StorageQueueHelper;
import com.azure.spring.storage.queue.support.converter.StorageQueueMessageConverter;
import com.azure.storage.queue.QueueAsyncClient;
import com.azure.storage.queue.models.QueueMessageItem;
import com.azure.storage.queue.models.QueueStorageException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;

public class StorageQueueTemplate
implements StorageQueueOperation {
    private static final Logger LOG = LoggerFactory.getLogger(StorageQueueTemplate.class);
    private static final int DEFAULT_VISIBILITY_TIMEOUT_IN_SECONDS = 30;
    private static final String MSG_FAIL_CHECKPOINT = "Failed to checkpoint %s in storage queue '%s'";
    private static final String MSG_SUCCESS_CHECKPOINT = "Checkpointed %s in storage queue '%s' in %s mode";
    private final StorageQueueClientFactory storageQueueClientFactory;
    protected StorageQueueMessageConverter messageConverter = new StorageQueueMessageConverter();
    private int visibilityTimeoutInSeconds = 30;
    private Class<?> messagePayloadType = byte[].class;
    private CheckpointMode checkpointMode = CheckpointMode.RECORD;

    public StorageQueueTemplate(@NonNull StorageQueueClientFactory storageQueueClientFactory) {
        this.storageQueueClientFactory = storageQueueClientFactory;
        LOG.info("StorageQueueTemplate started with properties {}", this.buildProperties());
    }

    public <T> Mono<Void> sendAsync(String queueName, @NonNull Message<T> message, PartitionSupplier partitionSupplier) {
        Assert.hasText((String)queueName, (String)"queueName can't be null or empty");
        QueueMessageItem queueMessageItem = (QueueMessageItem)this.messageConverter.fromMessage(message, QueueMessageItem.class);
        QueueAsyncClient queueClient = this.storageQueueClientFactory.getOrCreateQueueClient(queueName);
        Assert.notNull((Object)queueMessageItem, (String)"queueMessageItem can't be null");
        return queueClient.sendMessage(queueMessageItem.getMessageText()).then();
    }

    public Mono<Message<?>> receiveAsync(String queueName) {
        return this.receiveAsync(queueName, this.visibilityTimeoutInSeconds);
    }

    private Mono<Message<?>> receiveAsync(String queueName, int visibilityTimeoutInSeconds) {
        Assert.hasText((String)queueName, (String)"queueName can't be null or empty");
        QueueAsyncClient queueClient = this.storageQueueClientFactory.getOrCreateQueueClient(queueName);
        return queueClient.receiveMessages(Integer.valueOf(1), Duration.ofSeconds(visibilityTimeoutInSeconds)).onErrorMap(QueueStorageException.class, e -> new StorageQueueRuntimeException("Failed to send message to storage queue", (Throwable)e)).next().map(messageItem -> {
            HashMap<String, AzureCheckpointer> headers = new HashMap<String, AzureCheckpointer>();
            AzureCheckpointer checkpointer = new AzureCheckpointer(() -> this.checkpoint(queueClient, (QueueMessageItem)messageItem));
            if (this.checkpointMode == CheckpointMode.RECORD) {
                checkpointer.success().subscribe();
            } else if (this.checkpointMode == CheckpointMode.MANUAL) {
                headers.put("azure_checkpointer", checkpointer);
            }
            return this.messageConverter.toMessage(messageItem, (Map)new MessageHeaders(headers), this.messagePayloadType);
        });
    }

    private Mono<Void> checkpoint(QueueAsyncClient queueClient, QueueMessageItem messageItem) {
        return queueClient.deleteMessage(messageItem.getMessageId(), messageItem.getPopReceipt()).doOnSuccess(v -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.buildCheckpointSuccessMessage(messageItem, queueClient.getQueueName()));
            }
        }).doOnError(t -> {
            if (LOG.isWarnEnabled()) {
                LOG.warn(this.buildCheckpointFailMessage(messageItem, queueClient.getQueueName()), t);
            }
        });
    }

    private Map<String, Object> buildProperties() {
        HashMap<String, Object> properties = new HashMap<String, Object>();
        properties.put("visibilityTimeout", this.visibilityTimeoutInSeconds);
        properties.put("messagePayloadType", this.messagePayloadType);
        properties.put("checkpointMode", this.checkpointMode);
        return properties;
    }

    private boolean isValidCheckpointMode(CheckpointMode checkpointMode) {
        return checkpointMode == CheckpointMode.MANUAL || checkpointMode == CheckpointMode.RECORD;
    }

    public void checkpointHandler(QueueMessageItem message, String queueName, Throwable t) {
        if (t != null) {
            if (LOG.isWarnEnabled()) {
                LOG.warn(this.buildCheckpointFailMessage(message, queueName), t);
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug(this.buildCheckpointSuccessMessage(message, queueName));
        }
    }

    private String buildCheckpointFailMessage(QueueMessageItem cloudQueueMessage, String queueName) {
        return String.format(MSG_FAIL_CHECKPOINT, StorageQueueHelper.toString(cloudQueueMessage), queueName);
    }

    private String buildCheckpointSuccessMessage(QueueMessageItem cloudQueueMessage, String queueName) {
        return String.format(MSG_SUCCESS_CHECKPOINT, StorageQueueHelper.toString(cloudQueueMessage), queueName, this.checkpointMode);
    }

    public StorageQueueMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(StorageQueueMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    public int getVisibilityTimeoutInSeconds() {
        return this.visibilityTimeoutInSeconds;
    }

    @Override
    public void setVisibilityTimeoutInSeconds(int timeout) {
        Assert.state((timeout > 0 ? 1 : 0) != 0, (String)"VisibilityTimeoutInSeconds should be positive");
        this.visibilityTimeoutInSeconds = timeout;
        LOG.info("StorageQueueTemplate VisibilityTimeoutInSeconds becomes: {}", (Object)this.visibilityTimeoutInSeconds);
    }

    public Class<?> getMessagePayloadType() {
        return this.messagePayloadType;
    }

    public void setMessagePayloadType(Class<?> payloadType) {
        this.messagePayloadType = payloadType;
        LOG.info("StorageQueueTemplate messagePayloadType becomes: {}", this.messagePayloadType);
    }

    public CheckpointMode getCheckpointMode() {
        return this.checkpointMode;
    }

    public void setCheckpointMode(CheckpointMode checkpointMode) {
        Assert.state((boolean)this.isValidCheckpointMode(checkpointMode), (String)"Only MANUAL or RECORD checkpoint mode is supported in StorageQueueTemplate");
        this.checkpointMode = checkpointMode;
        LOG.info("StorageQueueTemplate checkpoint mode becomes: {}", (Object)this.checkpointMode);
    }
}

