/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.core.http.rest.Response;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.checkpointstore.blob.Messages;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.ListBlobsOptions;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class BlobCheckpointStore
implements CheckpointStore {
    private static final String SEQUENCE_NUMBER = "sequencenumber";
    private static final String OFFSET = "offset";
    private static final String OWNER_ID = "ownerid";
    private static final String ETAG = "eTag";
    private static final String BLOB_PATH_SEPARATOR = "/";
    private static final String CHECKPOINT_PATH = "/checkpoint/";
    private static final String OWNERSHIP_PATH = "/ownership/";
    private static final String PARTITION_ID_LOG_KEY = "partitionId";
    private static final String OWNER_ID_LOG_KEY = "ownerId";
    private static final String SEQUENCE_NUMBER_LOG_KEY = "sequenceNumber";
    private static final String BLOB_NAME_LOG_KEY = "blobName";
    private static final String OFFSET_LOG_KEY = "offset";
    public static final String EMPTY_STRING = "";
    private static final ByteBuffer UPLOAD_DATA = ByteBuffer.wrap("".getBytes(StandardCharsets.UTF_8));
    private static final ClientLogger LOGGER = new ClientLogger(BlobCheckpointStore.class);
    private final BlobContainerAsyncClient blobContainerAsyncClient;
    private final Map<String, BlobAsyncClient> blobClients = new ConcurrentHashMap<String, BlobAsyncClient>();

    public BlobCheckpointStore(BlobContainerAsyncClient blobContainerAsyncClient) {
        this.blobContainerAsyncClient = blobContainerAsyncClient;
    }

    public Flux<PartitionOwnership> listOwnership(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
        String prefix = this.getBlobPrefix(fullyQualifiedNamespace, eventHubName, consumerGroup, OWNERSHIP_PATH);
        return this.listBlobs(prefix, this::convertToPartitionOwnership);
    }

    public Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
        String prefix = this.getBlobPrefix(fullyQualifiedNamespace, eventHubName, consumerGroup, CHECKPOINT_PATH);
        return this.listBlobs(prefix, this::convertToCheckpoint);
    }

    private <T> Flux<T> listBlobs(String prefix, Function<BlobItem, Mono<T>> converter) {
        BlobListDetails details = new BlobListDetails().setRetrieveMetadata(true);
        ListBlobsOptions options = new ListBlobsOptions().setPrefix(prefix).setDetails(details);
        return this.blobContainerAsyncClient.listBlobs(options).flatMap(converter).filter(Objects::nonNull);
    }

    private Mono<Checkpoint> convertToCheckpoint(BlobItem blobItem) {
        String[] names = blobItem.getName().split(BLOB_PATH_SEPARATOR);
        LOGGER.atVerbose().addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()).log(Messages.FOUND_BLOB_FOR_PARTITION);
        if (names.length == 5) {
            if (CoreUtils.isNullOrEmpty((Map)blobItem.getMetadata())) {
                LOGGER.atWarning().addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()).log(Messages.NO_METADATA_AVAILABLE_FOR_BLOB);
                return Mono.empty();
            }
            Map metadata = blobItem.getMetadata();
            LOGGER.atVerbose().addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()).addKeyValue(SEQUENCE_NUMBER_LOG_KEY, (String)metadata.get(SEQUENCE_NUMBER)).addKeyValue("offset", (String)metadata.get("offset")).log(Messages.CHECKPOINT_INFO);
            Long sequenceNumber = null;
            Long offset = null;
            if (!CoreUtils.isNullOrEmpty((CharSequence)((CharSequence)metadata.get(SEQUENCE_NUMBER)))) {
                sequenceNumber = Long.parseLong((String)metadata.get(SEQUENCE_NUMBER));
            }
            if (!CoreUtils.isNullOrEmpty((CharSequence)((CharSequence)metadata.get("offset")))) {
                offset = Long.parseLong((String)metadata.get("offset"));
            }
            Checkpoint checkpoint = new Checkpoint().setFullyQualifiedNamespace(names[0]).setEventHubName(names[1]).setConsumerGroup(names[2]).setPartitionId(names[4]).setSequenceNumber(sequenceNumber).setOffset(offset);
            return Mono.just((Object)checkpoint);
        }
        return Mono.empty();
    }

    public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships) {
        return Flux.fromIterable(requestedPartitionOwnerships).flatMap(partitionOwnership -> {
            try {
                String partitionId = partitionOwnership.getPartitionId();
                String blobName = this.getBlobName(partitionOwnership.getFullyQualifiedNamespace(), partitionOwnership.getEventHubName(), partitionOwnership.getConsumerGroup(), partitionId, OWNERSHIP_PATH);
                if (!this.blobClients.containsKey(blobName)) {
                    this.blobClients.put(blobName, this.blobContainerAsyncClient.getBlobAsyncClient(blobName));
                }
                BlobAsyncClient blobAsyncClient = this.blobClients.get(blobName);
                HashMap<String, String> metadata = new HashMap<String, String>();
                metadata.put(OWNER_ID, partitionOwnership.getOwnerId());
                BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
                if (CoreUtils.isNullOrEmpty((CharSequence)partitionOwnership.getETag())) {
                    blobRequestConditions.setIfNoneMatch("*");
                    return blobAsyncClient.getBlockBlobAsyncClient().uploadWithResponse(Flux.just((Object)UPLOAD_DATA), 0L, null, metadata, null, null, blobRequestConditions).flatMapMany(response -> this.updateOwnershipETag((Response<?>)response, (PartitionOwnership)partitionOwnership), error -> {
                        LOGGER.atVerbose().addKeyValue(PARTITION_ID_LOG_KEY, partitionId).log(Messages.CLAIM_ERROR, new Object[]{error});
                        return Mono.error((Throwable)error);
                    }, Mono::empty);
                }
                blobRequestConditions.setIfMatch(partitionOwnership.getETag());
                return blobAsyncClient.setMetadataWithResponse(metadata, blobRequestConditions).flatMapMany(response -> this.updateOwnershipETag((Response<?>)response, (PartitionOwnership)partitionOwnership), error -> {
                    LOGGER.atVerbose().addKeyValue(PARTITION_ID_LOG_KEY, partitionId).log(Messages.CLAIM_ERROR, new Object[]{error});
                    return Mono.error((Throwable)error);
                }, Mono::empty);
            }
            catch (Exception ex) {
                LOGGER.atWarning().addKeyValue(PARTITION_ID_LOG_KEY, partitionOwnership.getPartitionId()).log(Messages.CLAIM_ERROR, new Object[]{ex});
                return Mono.error((Throwable)ex);
            }
        });
    }

    private Mono<PartitionOwnership> updateOwnershipETag(Response<?> response, PartitionOwnership ownership) {
        return Mono.just((Object)ownership.setETag(response.getHeaders().get(ETAG).getValue()));
    }

    public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
        if (checkpoint == null || checkpoint.getSequenceNumber() == null && checkpoint.getOffset() == null) {
            throw LOGGER.logExceptionAsWarning(Exceptions.propagate((Throwable)new IllegalStateException("Both sequence number and offset cannot be null when updating a checkpoint")));
        }
        String partitionId = checkpoint.getPartitionId();
        String blobName = this.getBlobName(checkpoint.getFullyQualifiedNamespace(), checkpoint.getEventHubName(), checkpoint.getConsumerGroup(), partitionId, CHECKPOINT_PATH);
        if (!this.blobClients.containsKey(blobName)) {
            this.blobClients.put(blobName, this.blobContainerAsyncClient.getBlobAsyncClient(blobName));
        }
        HashMap<String, String> metadata = new HashMap<String, String>();
        String sequenceNumber = checkpoint.getSequenceNumber() == null ? null : String.valueOf(checkpoint.getSequenceNumber());
        String offset = checkpoint.getOffset() == null ? null : String.valueOf(checkpoint.getOffset());
        metadata.put(SEQUENCE_NUMBER, sequenceNumber);
        metadata.put("offset", offset);
        BlobAsyncClient blobAsyncClient = this.blobClients.get(blobName);
        return blobAsyncClient.exists().flatMap(exists -> {
            if (exists.booleanValue()) {
                return blobAsyncClient.setMetadata(metadata);
            }
            return blobAsyncClient.getBlockBlobAsyncClient().uploadWithResponse(Flux.just((Object)UPLOAD_DATA), 0L, null, metadata, null, null, null).then();
        });
    }

    private String getBlobPrefix(String fullyQualifiedNamespace, String eventHubName, String consumerGroupName, String typeSuffix) {
        return fullyQualifiedNamespace + BLOB_PATH_SEPARATOR + eventHubName + BLOB_PATH_SEPARATOR + consumerGroupName + typeSuffix;
    }

    private String getBlobName(String fullyQualifiedNamespace, String eventHubName, String consumerGroupName, String partitionId, String typeSuffix) {
        return fullyQualifiedNamespace + BLOB_PATH_SEPARATOR + eventHubName + BLOB_PATH_SEPARATOR + consumerGroupName + typeSuffix + partitionId;
    }

    private Mono<PartitionOwnership> convertToPartitionOwnership(BlobItem blobItem) {
        LOGGER.atVerbose().addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()).log(Messages.FOUND_BLOB_FOR_PARTITION);
        String[] names = blobItem.getName().split(BLOB_PATH_SEPARATOR);
        if (names.length == 5) {
            if (CoreUtils.isNullOrEmpty((Map)blobItem.getMetadata())) {
                LOGGER.atWarning().addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()).log(Messages.NO_METADATA_AVAILABLE_FOR_BLOB);
                return Mono.empty();
            }
            BlobItemProperties blobProperties = blobItem.getProperties();
            String ownerId = blobItem.getMetadata().getOrDefault(OWNER_ID, EMPTY_STRING);
            if (ownerId == null) {
                ownerId = EMPTY_STRING;
            }
            LOGGER.atVerbose().addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()).addKeyValue(OWNER_ID_LOG_KEY, ownerId).log(Messages.BLOB_OWNER_INFO);
            PartitionOwnership partitionOwnership = new PartitionOwnership().setFullyQualifiedNamespace(names[0]).setEventHubName(names[1]).setConsumerGroup(names[2]).setPartitionId(names[4]).setOwnerId(ownerId).setLastModifiedTime(Long.valueOf(blobProperties.getLastModified().toInstant().toEpochMilli())).setETag(blobProperties.getETag());
            return Mono.just((Object)partitionOwnership);
        }
        return Mono.empty();
    }
}

