/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.eventprocessorhost;

import com.google.gson.Gson;
import com.microsoft.azure.eventhubs.IllegalEntityException;
import com.microsoft.azure.eventprocessorhost.AzureBlobLease;
import com.microsoft.azure.eventprocessorhost.Checkpoint;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.ICheckpointManager;
import com.microsoft.azure.eventprocessorhost.ILeaseManager;
import com.microsoft.azure.eventprocessorhost.Lease;
import com.microsoft.azure.eventprocessorhost.LeaseLostException;
import com.microsoft.azure.eventprocessorhost.LoggingUtils;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.StorageExtendedErrorInformation;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.LeaseState;
import com.microsoft.azure.storage.blob.ListBlobItem;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Hashtable;
import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AzureStorageCheckpointLeaseManager
implements ICheckpointManager,
ILeaseManager {
    private EventProcessorHost host;
    private final String storageConnectionString;
    private String storageContainerName;
    private String storageBlobPrefix;
    private CloudBlobClient storageClient;
    private CloudBlobContainer eventHubContainer;
    private CloudBlobDirectory consumerGroupDirectory;
    private Gson gson;
    private final BlobRequestOptions leaseOperationOptions = new BlobRequestOptions();
    private final BlobRequestOptions checkpointOperationOptions = new BlobRequestOptions();
    private final BlobRequestOptions renewRequestOptions = new BlobRequestOptions();
    private Hashtable<String, Checkpoint> latestCheckpoint = new Hashtable();
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(AzureStorageCheckpointLeaseManager.class);

    AzureStorageCheckpointLeaseManager(String storageConnectionString) {
        this(storageConnectionString, null);
    }

    AzureStorageCheckpointLeaseManager(String storageConnectionString, String storageContainerName) {
        this(storageConnectionString, storageContainerName, "");
    }

    AzureStorageCheckpointLeaseManager(String storageConnectionString, String storageContainerName, String storageBlobPrefix) {
        if (storageConnectionString == null || storageConnectionString.trim().isEmpty()) {
            throw new IllegalArgumentException("Provide valid Azure Storage connection string when using Azure Storage");
        }
        this.storageConnectionString = storageConnectionString;
        if (storageContainerName != null && storageContainerName.trim().isEmpty()) {
            throw new IllegalArgumentException("Azure Storage container name must be a valid container name or null to use the default");
        }
        this.storageContainerName = storageContainerName;
        this.storageBlobPrefix = storageBlobPrefix != null ? storageBlobPrefix.trim() : "";
    }

    void initialize(EventProcessorHost host) throws InvalidKeyException, URISyntaxException, StorageException {
        this.host = host;
        if (this.storageContainerName == null) {
            this.storageContainerName = this.host.getEventHubPath();
            Pattern p = Pattern.compile("^(?-i)(?:[a-z0-9]|(?<=[0-9a-z])-(?=[0-9a-z])){3,63}$");
            Matcher m = p.matcher(this.storageContainerName);
            if (!m.find()) {
                throw new IllegalArgumentException("EventHub names must conform to the following rules to be able to use it with EventProcessorHost: Must start with a letter or number, and can contain only letters, numbers, and the dash (-) character. Every dash (-) character must be immediately preceded and followed by a letter or number; consecutive dashes are not permitted in container names. All letters in a container name must be lowercase. Must be from 3 to 63 characters long.");
            }
        }
        this.storageClient = CloudStorageAccount.parse((String)this.storageConnectionString).createCloudBlobClient();
        this.eventHubContainer = this.storageClient.getContainerReference(this.storageContainerName);
        this.consumerGroupDirectory = this.eventHubContainer.getDirectoryReference(this.storageBlobPrefix + this.host.getConsumerGroupName());
        this.gson = new Gson();
        this.leaseOperationOptions.setMaximumExecutionTimeInMs(Integer.valueOf(host.getPartitionManagerOptions().getLeaseDurationInSeconds() * 1000));
        this.storageClient.setDefaultRequestOptions(this.leaseOperationOptions);
        this.checkpointOperationOptions.setMaximumExecutionTimeInMs(Integer.valueOf(host.getPartitionManagerOptions().getCheckpointTimeoutInSeconds() * 1000));
        this.renewRequestOptions.setMaximumExecutionTimeInMs(Integer.valueOf(host.getPartitionManagerOptions().getLeaseDurationInSeconds() * 1000));
    }

    @Override
    public Future<Boolean> checkpointStoreExists() {
        return this.leaseStoreExists(this.checkpointOperationOptions);
    }

    @Override
    public Future<Boolean> createCheckpointStoreIfNotExists() {
        return this.createLeaseStoreIfNotExists(this.checkpointOperationOptions);
    }

    @Override
    public Future<Boolean> deleteCheckpointStore() {
        return this.deleteLeaseStore(this.checkpointOperationOptions);
    }

    @Override
    public Future<Checkpoint> getCheckpoint(String partitionId) {
        return this.host.getExecutorService().submit(() -> this.getCheckpointSync(partitionId));
    }

    private Checkpoint getCheckpointSync(String partitionId) throws URISyntaxException, IOException, StorageException {
        AzureBlobLease lease = this.getLeaseSync(partitionId, this.checkpointOperationOptions);
        Checkpoint checkpoint = null;
        if (lease.getOffset() != null) {
            checkpoint = new Checkpoint(partitionId);
            checkpoint.setOffset(lease.getOffset());
            checkpoint.setSequenceNumber(lease.getSequenceNumber());
        }
        return checkpoint;
    }

    @Override
    public Future<Checkpoint> createCheckpointIfNotExists(String partitionId) {
        return this.host.getExecutorService().submit(() -> this.createCheckpointIfNotExistsSync(partitionId));
    }

    private Checkpoint createCheckpointIfNotExistsSync(String partitionId) throws Exception {
        AzureBlobLease lease = this.createLeaseIfNotExistsSync(partitionId, this.checkpointOperationOptions);
        Checkpoint checkpoint = null;
        if (lease.getOffset() != null) {
            checkpoint = new Checkpoint(partitionId, lease.getOffset(), lease.getSequenceNumber());
        }
        return checkpoint;
    }

    @Override
    @Deprecated
    public Future<Void> updateCheckpoint(Checkpoint checkpoint) {
        throw new RuntimeException("Use updateCheckpoint(checkpoint, lease) instead.");
    }

    @Override
    public Future<Void> updateCheckpoint(Lease lease, Checkpoint checkpoint) {
        return this.host.getExecutorService().submit(() -> this.updateCheckpointSync(lease, checkpoint));
    }

    private Void updateCheckpointSync(Lease lease, Checkpoint checkpoint) throws Exception {
        AzureBlobLease updatedLease = new AzureBlobLease((AzureBlobLease)lease);
        TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), checkpoint.getPartitionId(), "Checkpointing at " + checkpoint.getOffset() + " // " + checkpoint.getSequenceNumber()));
        updatedLease.setOffset(checkpoint.getOffset());
        updatedLease.setSequenceNumber(checkpoint.getSequenceNumber());
        this.updateLeaseSync(updatedLease, this.checkpointOperationOptions);
        return null;
    }

    @Override
    public Future<Void> deleteCheckpoint(String partitionId) {
        return this.host.getExecutorService().submit(() -> this.deleteCheckpointSync(partitionId));
    }

    private Void deleteCheckpointSync(String partitionId) throws Exception {
        AzureBlobLease lease = this.getLeaseSync(partitionId, this.checkpointOperationOptions);
        TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), partitionId, "Deleting checkpoint for " + partitionId));
        lease.setOffset(null);
        lease.setSequenceNumber(0L);
        this.updateLeaseSync(lease, this.checkpointOperationOptions);
        return null;
    }

    @Override
    public int getLeaseRenewIntervalInMilliseconds() {
        return this.host.getPartitionManagerOptions().getLeaseRenewIntervalInSeconds() * 1000;
    }

    @Override
    public int getLeaseDurationInMilliseconds() {
        return this.host.getPartitionManagerOptions().getLeaseDurationInSeconds() * 1000;
    }

    @Override
    public Future<Boolean> leaseStoreExists() {
        return this.leaseStoreExists(this.leaseOperationOptions);
    }

    private Future<Boolean> leaseStoreExists(BlobRequestOptions options) {
        return this.host.getExecutorService().submit(() -> this.eventHubContainer.exists(null, options, null));
    }

    @Override
    public Future<Boolean> createLeaseStoreIfNotExists() {
        return this.createLeaseStoreIfNotExists(this.leaseOperationOptions);
    }

    private Future<Boolean> createLeaseStoreIfNotExists(BlobRequestOptions options) {
        return this.host.getExecutorService().submit(() -> this.eventHubContainer.createIfNotExists(options, null));
    }

    @Override
    public Future<Boolean> deleteLeaseStore() {
        return this.host.getExecutorService().submit(() -> this.deleteLeaseStoreSync(this.leaseOperationOptions));
    }

    private Future<Boolean> deleteLeaseStore(BlobRequestOptions options) {
        return this.host.getExecutorService().submit(() -> this.deleteLeaseStoreSync(options));
    }

    private Boolean deleteLeaseStoreSync(BlobRequestOptions options) {
        boolean retval = true;
        for (ListBlobItem blob : this.eventHubContainer.listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), options, null)) {
            if (blob instanceof CloudBlobDirectory) {
                try {
                    for (ListBlobItem subBlob : ((CloudBlobDirectory)blob).listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), options, null)) {
                        ((CloudBlockBlob)subBlob).deleteIfExists(DeleteSnapshotsOption.NONE, null, options, null);
                    }
                    continue;
                }
                catch (StorageException | URISyntaxException e) {
                    TRACE_LOGGER.warn(LoggingUtils.withHost(this.host.getHostName(), "Failure while deleting lease store"), e);
                    retval = false;
                    continue;
                }
            }
            if (!(blob instanceof CloudBlockBlob)) continue;
            try {
                ((CloudBlockBlob)blob).deleteIfExists(DeleteSnapshotsOption.NONE, null, options, null);
            }
            catch (StorageException e) {
                TRACE_LOGGER.warn(LoggingUtils.withHost(this.host.getHostName(), "Failure while deleting lease store"), (Throwable)e);
                retval = false;
            }
        }
        try {
            this.eventHubContainer.deleteIfExists(null, options, null);
        }
        catch (StorageException e) {
            TRACE_LOGGER.warn(LoggingUtils.withHost(this.host.getHostName(), "Failure while deleting lease store"), (Throwable)e);
            retval = false;
        }
        return retval;
    }

    @Override
    public Future<Lease> getLease(String partitionId) {
        return this.host.getExecutorService().submit(() -> this.getLeaseSync(partitionId, this.leaseOperationOptions));
    }

    private AzureBlobLease getLeaseSync(String partitionId, BlobRequestOptions options) throws URISyntaxException, IOException, StorageException {
        AzureBlobLease retval = null;
        CloudBlockBlob leaseBlob = this.consumerGroupDirectory.getBlockBlobReference(partitionId);
        if (leaseBlob.exists(null, options, null)) {
            retval = this.downloadLease(leaseBlob, options);
        }
        return retval;
    }

    @Override
    public Iterable<Future<Lease>> getAllLeases() throws IllegalEntityException {
        String[] partitionIds;
        ArrayList<Future<Lease>> leaseFutures = new ArrayList<Future<Lease>>();
        for (String id : partitionIds = this.host.getPartitionManager().getPartitionIds()) {
            leaseFutures.add(this.getLease(id));
        }
        return leaseFutures;
    }

    @Override
    public Future<Lease> createLeaseIfNotExists(String partitionId) {
        return this.host.getExecutorService().submit(() -> this.createLeaseIfNotExistsSync(partitionId, this.leaseOperationOptions));
    }

    private AzureBlobLease createLeaseIfNotExistsSync(String partitionId, BlobRequestOptions options) throws URISyntaxException, IOException, StorageException {
        AzureBlobLease returnLease = null;
        try {
            CloudBlockBlob leaseBlob = this.consumerGroupDirectory.getBlockBlobReference(partitionId);
            returnLease = new AzureBlobLease(partitionId, leaseBlob, this.leaseOperationOptions);
            TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), partitionId, "CreateLeaseIfNotExist - leaseContainerName: " + this.storageContainerName + " consumerGroupName: " + this.host.getConsumerGroupName() + "storageBlobPrefix: " + this.storageBlobPrefix));
            this.uploadLease(returnLease, leaseBlob, AccessCondition.generateIfNoneMatchCondition((String)"*"), UploadActivity.Create, options);
        }
        catch (StorageException se) {
            StorageExtendedErrorInformation extendedErrorInfo = se.getExtendedErrorInformation();
            if (extendedErrorInfo != null && (extendedErrorInfo.getErrorCode().compareTo("BlobAlreadyExists") == 0 || extendedErrorInfo.getErrorCode().compareTo("LeaseIdMissing") == 0)) {
                TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), partitionId, "Lease already exists"));
                returnLease = this.getLeaseSync(partitionId, options);
            }
            TRACE_LOGGER.warn(LoggingUtils.withHostAndPartition(this.host.getHostName(), partitionId, "CreateLeaseIfNotExist StorageException - leaseContainerName: " + this.storageContainerName + " consumerGroupName: " + this.host.getConsumerGroupName() + "storageBlobPrefix: " + this.storageBlobPrefix), (Throwable)se);
            throw se;
        }
        return returnLease;
    }

    @Override
    public Future<Void> deleteLease(Lease lease) {
        return this.host.getExecutorService().submit(() -> this.deleteLeaseSync((AzureBlobLease)lease));
    }

    private Void deleteLeaseSync(AzureBlobLease lease) throws StorageException {
        TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), lease.getPartitionId(), "Deleting lease"));
        lease.getBlob().deleteIfExists();
        return null;
    }

    @Override
    public Future<Boolean> acquireLease(Lease lease) {
        return this.host.getExecutorService().submit(() -> this.acquireLeaseSync((AzureBlobLease)lease));
    }

    private Boolean acquireLeaseSync(AzureBlobLease lease) throws Exception {
        TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), lease.getPartitionId(), "Acquiring lease"));
        CloudBlockBlob leaseBlob = lease.getBlob();
        boolean succeeded = true;
        String newLeaseId = EventProcessorHost.safeCreateUUID();
        if (newLeaseId == null || newLeaseId.isEmpty()) {
            throw new IllegalArgumentException("acquireLeaseSync: newLeaseId really is " + (newLeaseId == null ? "null" : "empty"));
        }
        try {
            String newToken = null;
            leaseBlob.downloadAttributes();
            if (leaseBlob.getProperties().getLeaseState() == LeaseState.LEASED) {
                TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), lease.getPartitionId(), "changeLease"));
                if (lease.getToken() == null || lease.getToken().isEmpty()) {
                    succeeded = false;
                } else {
                    newToken = leaseBlob.changeLease(newLeaseId, AccessCondition.generateLeaseCondition((String)lease.getToken()));
                }
            } else {
                TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), lease.getPartitionId(), "acquireLease"));
                newToken = leaseBlob.acquireLease(Integer.valueOf(this.host.getPartitionManagerOptions().getLeaseDurationInSeconds()), newLeaseId);
            }
            if (succeeded) {
                lease.setToken(newToken);
                lease.setOwner(this.host.getHostName());
                lease.incrementEpoch();
                this.uploadLease(lease, leaseBlob, AccessCondition.generateLeaseCondition((String)lease.getToken()), UploadActivity.Acquire, this.leaseOperationOptions);
            }
        }
        catch (StorageException se) {
            if (this.wasLeaseLost(se, lease.getPartitionId())) {
                succeeded = false;
            }
            throw se;
        }
        return succeeded;
    }

    @Override
    public Future<Boolean> renewLease(Lease lease) {
        return this.host.getExecutorService().submit(() -> this.renewLeaseSync((AzureBlobLease)lease));
    }

    private Boolean renewLeaseSync(AzureBlobLease lease) throws Exception {
        TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), lease.getPartitionId(), "Renewing lease"));
        CloudBlockBlob leaseBlob = lease.getBlob();
        boolean retval = true;
        try {
            leaseBlob.renewLease(AccessCondition.generateLeaseCondition((String)lease.getToken()), this.renewRequestOptions, null);
        }
        catch (StorageException se) {
            if (this.wasLeaseLost(se, lease.getPartitionId())) {
                retval = false;
            }
            throw se;
        }
        return retval;
    }

    @Override
    public Future<Boolean> releaseLease(Lease lease) {
        return this.host.getExecutorService().submit(() -> this.releaseLeaseSync((AzureBlobLease)lease));
    }

    private Boolean releaseLeaseSync(AzureBlobLease lease) throws Exception {
        TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), lease.getPartitionId(), "Releasing lease"));
        CloudBlockBlob leaseBlob = lease.getBlob();
        boolean retval = true;
        try {
            String leaseId = lease.getToken();
            AzureBlobLease releasedCopy = new AzureBlobLease(lease);
            releasedCopy.setToken("");
            releasedCopy.setOwner("");
            this.uploadLease(releasedCopy, leaseBlob, AccessCondition.generateLeaseCondition((String)leaseId), UploadActivity.Release, this.leaseOperationOptions);
            leaseBlob.releaseLease(AccessCondition.generateLeaseCondition((String)leaseId));
        }
        catch (StorageException se) {
            if (this.wasLeaseLost(se, lease.getPartitionId())) {
                retval = false;
            }
            throw se;
        }
        return retval;
    }

    @Override
    public Future<Boolean> updateLease(Lease lease) {
        return this.host.getExecutorService().submit(() -> this.updateLeaseSync((AzureBlobLease)lease, this.leaseOperationOptions));
    }

    public Boolean updateLeaseSync(AzureBlobLease lease, BlobRequestOptions options) throws Exception {
        if (lease == null) {
            return false;
        }
        TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), lease.getPartitionId(), "Updating lease"));
        String token = lease.getToken();
        if (token == null || token.length() == 0) {
            return false;
        }
        if (!this.renewLeaseSync(lease).booleanValue()) {
            return false;
        }
        CloudBlockBlob leaseBlob = lease.getBlob();
        try {
            this.uploadLease(lease, leaseBlob, AccessCondition.generateLeaseCondition((String)token), UploadActivity.Update, options);
        }
        catch (StorageException se) {
            if (this.wasLeaseLost(se, lease.getPartitionId())) {
                throw new LeaseLostException(lease, (Throwable)se);
            }
            throw se;
        }
        return true;
    }

    private AzureBlobLease downloadLease(CloudBlockBlob blob, BlobRequestOptions options) throws StorageException, IOException {
        String jsonLease = blob.downloadText(null, null, options, null);
        TRACE_LOGGER.info(LoggingUtils.withHost(this.host.getHostName(), "Raw JSON downloaded: " + jsonLease));
        AzureBlobLease rehydrated = (AzureBlobLease)this.gson.fromJson(jsonLease, AzureBlobLease.class);
        AzureBlobLease blobLease = new AzureBlobLease(rehydrated, blob, this.leaseOperationOptions);
        if (blobLease.getOffset() != null) {
            this.latestCheckpoint.put(blobLease.getPartitionId(), blobLease.getCheckpoint());
        }
        return blobLease;
    }

    private void uploadLease(AzureBlobLease lease, CloudBlockBlob blob, AccessCondition condition, UploadActivity activity, BlobRequestOptions options) throws StorageException, IOException {
        if (activity != UploadActivity.Create) {
            Checkpoint cached = this.latestCheckpoint.get(lease.getPartitionId());
            if (cached != null && (cached.getSequenceNumber() > lease.getSequenceNumber() || lease.getOffset() == null)) {
                lease.setOffset(cached.getOffset());
                lease.setSequenceNumber(cached.getSequenceNumber());
                TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), lease.getPartitionId(), "Replacing stale offset/seqno while uploading lease"));
            } else if (lease.getOffset() != null) {
                this.latestCheckpoint.put(lease.getPartitionId(), lease.getCheckpoint());
            }
        }
        String jsonLease = this.gson.toJson((Object)lease);
        blob.uploadText(jsonLease, null, condition, options, null);
        TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), lease.getPartitionId(), "Raw JSON uploading for " + (Object)((Object)activity) + ": " + jsonLease));
    }

    private boolean wasLeaseLost(StorageException se, String partitionId) {
        StorageExtendedErrorInformation extendedErrorInfo;
        boolean retval = false;
        TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), partitionId, "WAS LEASE LOST?"));
        TRACE_LOGGER.info(LoggingUtils.withHostAndPartition(this.host.getHostName(), partitionId, "Http " + se.getHttpStatusCode()));
        if (se.getExtendedErrorInformation() != null) {
            TRACE_LOGGER.warn(LoggingUtils.withHostAndPartition(this.host.getHostName(), partitionId, "Http " + se.getExtendedErrorInformation().getErrorCode() + " :: " + se.getExtendedErrorInformation().getErrorMessage()));
        }
        if ((se.getHttpStatusCode() == 409 || se.getHttpStatusCode() == 412) && (extendedErrorInfo = se.getExtendedErrorInformation()) != null) {
            String errorCode = extendedErrorInfo.getErrorCode();
            TRACE_LOGGER.warn(LoggingUtils.withHostAndPartition(this.host.getHostName(), partitionId, "Error code: " + errorCode));
            TRACE_LOGGER.warn(LoggingUtils.withHostAndPartition(this.host.getHostName(), partitionId, "Error message: " + extendedErrorInfo.getErrorMessage()));
            if (errorCode.compareTo("LeaseLost") == 0 || errorCode.compareTo("LeaseIdMismatchWithLeaseOperation") == 0 || errorCode.compareTo("LeaseIdMismatchWithBlobOperation") == 0 || errorCode.compareTo("LeaseAlreadyPresent") == 0) {
                retval = true;
            }
        }
        return retval;
    }

    private static enum UploadActivity {
        Create,
        Acquire,
        Release,
        Update;

    }
}

