/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.eventprocessorhost;

import com.google.gson.Gson;
import com.microsoft.azure.eventprocessorhost.AzureBlobLease;
import com.microsoft.azure.eventprocessorhost.Checkpoint;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.ICheckpointManager;
import com.microsoft.azure.eventprocessorhost.ILeaseManager;
import com.microsoft.azure.eventprocessorhost.Lease;
import com.microsoft.azure.eventprocessorhost.LeaseLostException;
import com.microsoft.azure.servicebus.IllegalEntityException;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.StorageExtendedErrorInformation;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.LeaseState;
import com.microsoft.azure.storage.blob.ListBlobItem;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.concurrent.Future;
import java.util.logging.Level;

class AzureStorageCheckpointLeaseManager
implements ICheckpointManager,
ILeaseManager {
    private EventProcessorHost host;
    private final String storageConnectionString;
    private String storageContainerName;
    private String storageBlobPrefix;
    private CloudBlobClient storageClient;
    private CloudBlobContainer eventHubContainer;
    private CloudBlobDirectory consumerGroupDirectory;
    private Gson gson;
    private static final int storageMaximumExecutionTimeInMs = 120000;
    private static final int leaseDurationInSeconds = 30;
    private static final int leaseRenewIntervalInMilliseconds = 10000;
    private final BlobRequestOptions renewRequestOptions = new BlobRequestOptions();
    private Hashtable<String, Checkpoint> latestCheckpoint = new Hashtable();

    AzureStorageCheckpointLeaseManager(String storageConnectionString) {
        this(storageConnectionString, null);
    }

    AzureStorageCheckpointLeaseManager(String storageConnectionString, String storageContainerName) {
        this(storageConnectionString, storageContainerName, "");
    }

    AzureStorageCheckpointLeaseManager(String storageConnectionString, String storageContainerName, String storageBlobPrefix) {
        if (storageConnectionString == null || storageConnectionString.trim().isEmpty()) {
            throw new IllegalArgumentException("Provide valid Azure Storage connection string when using Azure Storage");
        }
        this.storageConnectionString = storageConnectionString;
        if (storageContainerName != null && storageContainerName.trim().isEmpty()) {
            throw new IllegalArgumentException("Azure Storage container name must be a valid container name or null to use the default");
        }
        this.storageContainerName = storageContainerName;
        this.storageBlobPrefix = storageBlobPrefix != null ? storageBlobPrefix.trim() : "";
    }

    void initialize(EventProcessorHost host) throws InvalidKeyException, URISyntaxException, StorageException {
        this.host = host;
        if (this.storageContainerName == null) {
            this.storageContainerName = this.host.getEventHubPath();
        }
        this.storageClient = CloudStorageAccount.parse((String)this.storageConnectionString).createCloudBlobClient();
        BlobRequestOptions options = new BlobRequestOptions();
        options.setMaximumExecutionTimeInMs(Integer.valueOf(120000));
        this.storageClient.setDefaultRequestOptions(options);
        this.eventHubContainer = this.storageClient.getContainerReference(this.storageContainerName);
        this.consumerGroupDirectory = this.eventHubContainer.getDirectoryReference(this.storageBlobPrefix + this.host.getConsumerGroupName());
        this.gson = new Gson();
    }

    @Override
    public Future<Boolean> checkpointStoreExists() {
        return this.leaseStoreExists();
    }

    @Override
    public Future<Boolean> createCheckpointStoreIfNotExists() {
        return this.createLeaseStoreIfNotExists();
    }

    @Override
    public Future<Boolean> deleteCheckpointStore() {
        return this.deleteLeaseStore();
    }

    @Override
    public Future<Checkpoint> getCheckpoint(String partitionId) {
        return EventProcessorHost.getExecutorService().submit(() -> this.getCheckpointSync(partitionId));
    }

    private Checkpoint getCheckpointSync(String partitionId) throws URISyntaxException, IOException, StorageException {
        AzureBlobLease lease = this.getLeaseSync(partitionId);
        Checkpoint checkpoint = null;
        if (lease.getOffset() != null) {
            checkpoint = new Checkpoint(partitionId);
            checkpoint.setOffset(lease.getOffset());
            checkpoint.setSequenceNumber(lease.getSequenceNumber());
        }
        return checkpoint;
    }

    @Override
    public Future<Checkpoint> createCheckpointIfNotExists(String partitionId) {
        return EventProcessorHost.getExecutorService().submit(() -> this.createCheckpointIfNotExistsSync(partitionId));
    }

    private Checkpoint createCheckpointIfNotExistsSync(String partitionId) throws Exception {
        AzureBlobLease lease = this.createLeaseIfNotExistsSync(partitionId);
        Checkpoint checkpoint = null;
        if (lease.getOffset() != null) {
            checkpoint = new Checkpoint(partitionId, lease.getOffset(), lease.getSequenceNumber());
        }
        return checkpoint;
    }

    @Override
    public Future<Void> updateCheckpoint(Checkpoint checkpoint) {
        return EventProcessorHost.getExecutorService().submit(() -> this.updateCheckpointSync(checkpoint));
    }

    private Void updateCheckpointSync(Checkpoint checkpoint) throws Exception {
        AzureBlobLease lease = this.getLeaseSync(checkpoint.getPartitionId());
        this.host.logWithHostAndPartition(Level.FINER, checkpoint.getPartitionId(), "Checkpointing at " + checkpoint.getOffset() + " // " + checkpoint.getSequenceNumber());
        lease.setOffset(checkpoint.getOffset());
        lease.setSequenceNumber(checkpoint.getSequenceNumber());
        this.updateLeaseSync(lease);
        return null;
    }

    @Override
    public Future<Void> deleteCheckpoint(String partitionId) {
        return EventProcessorHost.getExecutorService().submit(() -> this.deleteCheckpointSync(partitionId));
    }

    private Void deleteCheckpointSync(String partitionId) throws Exception {
        AzureBlobLease lease = this.getLeaseSync(partitionId);
        this.host.logWithHostAndPartition(Level.FINER, partitionId, "Deleting checkpoint for " + partitionId);
        lease.setOffset(null);
        lease.setSequenceNumber(0L);
        this.updateLeaseSync(lease);
        return null;
    }

    @Override
    public int getLeaseRenewIntervalInMilliseconds() {
        return 10000;
    }

    @Override
    public int getLeaseDurationInMilliseconds() {
        return 30000;
    }

    @Override
    public Future<Boolean> leaseStoreExists() {
        return EventProcessorHost.getExecutorService().submit(() -> this.eventHubContainer.exists());
    }

    @Override
    public Future<Boolean> createLeaseStoreIfNotExists() {
        return EventProcessorHost.getExecutorService().submit(() -> this.eventHubContainer.createIfNotExists());
    }

    @Override
    public Future<Boolean> deleteLeaseStore() {
        return EventProcessorHost.getExecutorService().submit(() -> this.deleteLeaseStoreSync());
    }

    private Boolean deleteLeaseStoreSync() {
        boolean retval = true;
        for (ListBlobItem blob : this.eventHubContainer.listBlobs()) {
            if (blob instanceof CloudBlobDirectory) {
                try {
                    for (ListBlobItem subBlob : ((CloudBlobDirectory)blob).listBlobs()) {
                        ((CloudBlockBlob)subBlob).deleteIfExists();
                    }
                    continue;
                }
                catch (StorageException | URISyntaxException e) {
                    this.host.logWithHost(Level.WARNING, "Failure while deleting lease store", e);
                    retval = false;
                    continue;
                }
            }
            if (!(blob instanceof CloudBlockBlob)) continue;
            try {
                ((CloudBlockBlob)blob).deleteIfExists();
            }
            catch (StorageException e) {
                this.host.logWithHost(Level.WARNING, "Failure while deleting lease store", e);
                retval = false;
            }
        }
        try {
            this.eventHubContainer.deleteIfExists();
        }
        catch (StorageException e) {
            this.host.logWithHost(Level.WARNING, "Failure while deleting lease store", e);
            retval = false;
        }
        return retval;
    }

    @Override
    public Future<Lease> getLease(String partitionId) {
        return EventProcessorHost.getExecutorService().submit(() -> this.getLeaseSync(partitionId));
    }

    private AzureBlobLease getLeaseSync(String partitionId) throws URISyntaxException, IOException, StorageException {
        AzureBlobLease retval = null;
        CloudBlockBlob leaseBlob = this.consumerGroupDirectory.getBlockBlobReference(partitionId);
        if (leaseBlob.exists()) {
            retval = this.downloadLease(leaseBlob);
        }
        return retval;
    }

    @Override
    public Iterable<Future<Lease>> getAllLeases() throws IllegalEntityException {
        ArrayList<Future<Lease>> leaseFutures = new ArrayList<Future<Lease>>();
        Iterable<String> partitionIds = this.host.getPartitionManager().getPartitionIds();
        for (String id : partitionIds) {
            leaseFutures.add(this.getLease(id));
        }
        return leaseFutures;
    }

    @Override
    public Future<Lease> createLeaseIfNotExists(String partitionId) {
        return EventProcessorHost.getExecutorService().submit(() -> this.createLeaseIfNotExistsSync(partitionId));
    }

    private AzureBlobLease createLeaseIfNotExistsSync(String partitionId) throws URISyntaxException, IOException, StorageException {
        AzureBlobLease returnLease = null;
        try {
            CloudBlockBlob leaseBlob = this.consumerGroupDirectory.getBlockBlobReference(partitionId);
            returnLease = new AzureBlobLease(partitionId, leaseBlob);
            this.host.logWithHostAndPartition(Level.FINE, partitionId, "CreateLeaseIfNotExist - leaseContainerName: " + this.storageContainerName + " consumerGroupName: " + this.host.getConsumerGroupName() + "storageBlobPrefix: " + this.storageBlobPrefix);
            this.uploadLease(returnLease, leaseBlob, AccessCondition.generateIfNoneMatchCondition((String)"*"), UploadActivity.Create);
        }
        catch (StorageException se) {
            StorageExtendedErrorInformation extendedErrorInfo = se.getExtendedErrorInformation();
            if (extendedErrorInfo != null && (extendedErrorInfo.getErrorCode().compareTo("BlobAlreadyExists") == 0 || extendedErrorInfo.getErrorCode().compareTo("LeaseIdMissing") == 0)) {
                this.host.logWithHostAndPartition(Level.FINE, partitionId, "Lease already exists");
                returnLease = this.getLeaseSync(partitionId);
            }
            this.host.logWithHostAndPartition(Level.SEVERE, partitionId, "CreateLeaseIfNotExist StorageException - leaseContainerName: " + this.storageContainerName + " consumerGroupName: " + this.host.getConsumerGroupName() + "storageBlobPrefix: " + this.storageBlobPrefix, (Throwable)se);
            throw se;
        }
        return returnLease;
    }

    @Override
    public Future<Void> deleteLease(Lease lease) {
        return EventProcessorHost.getExecutorService().submit(() -> this.deleteLeaseSync((AzureBlobLease)lease));
    }

    private Void deleteLeaseSync(AzureBlobLease lease) throws StorageException {
        this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Deleting lease");
        lease.getBlob().deleteIfExists();
        return null;
    }

    @Override
    public Future<Boolean> acquireLease(Lease lease) {
        return EventProcessorHost.getExecutorService().submit(() -> this.acquireLeaseSync((AzureBlobLease)lease));
    }

    private Boolean acquireLeaseSync(AzureBlobLease lease) throws Exception {
        this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Acquiring lease");
        CloudBlockBlob leaseBlob = lease.getBlob();
        boolean retval = true;
        String newLeaseId = EventProcessorHost.safeCreateUUID();
        if (newLeaseId == null || newLeaseId.isEmpty()) {
            throw new IllegalArgumentException("acquireLeaseSync: newLeaseId really is " + (newLeaseId == null ? "null" : "empty"));
        }
        try {
            String newToken = null;
            leaseBlob.downloadAttributes();
            if (leaseBlob.getProperties().getLeaseState() == LeaseState.LEASED) {
                this.host.logWithHostAndPartition(Level.FINER, lease.getPartitionId(), "changeLease");
                newToken = leaseBlob.changeLease(newLeaseId, AccessCondition.generateLeaseCondition((String)lease.getToken()));
            } else {
                this.host.logWithHostAndPartition(Level.FINER, lease.getPartitionId(), "acquireLease");
                newToken = leaseBlob.acquireLease(Integer.valueOf(30), newLeaseId);
            }
            lease.setToken(newToken);
            lease.setOwner(this.host.getHostName());
            lease.incrementEpoch();
            this.uploadLease(lease, leaseBlob, AccessCondition.generateLeaseCondition((String)lease.getToken()), UploadActivity.Acquire);
        }
        catch (StorageException se) {
            if (this.wasLeaseLost(se, lease.getPartitionId())) {
                retval = false;
            }
            throw se;
        }
        return retval;
    }

    @Override
    public Future<Boolean> renewLease(Lease lease) {
        return EventProcessorHost.getExecutorService().submit(() -> this.renewLeaseSync((AzureBlobLease)lease));
    }

    private Boolean renewLeaseSync(AzureBlobLease lease) throws Exception {
        this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Renewing lease");
        CloudBlockBlob leaseBlob = lease.getBlob();
        boolean retval = true;
        try {
            leaseBlob.renewLease(AccessCondition.generateLeaseCondition((String)lease.getToken()), this.renewRequestOptions, null);
        }
        catch (StorageException se) {
            if (this.wasLeaseLost(se, lease.getPartitionId())) {
                retval = false;
            }
            throw se;
        }
        return retval;
    }

    @Override
    public Future<Boolean> releaseLease(Lease lease) {
        return EventProcessorHost.getExecutorService().submit(() -> this.releaseLeaseSync((AzureBlobLease)lease));
    }

    private Boolean releaseLeaseSync(AzureBlobLease lease) throws Exception {
        this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Releasing lease");
        CloudBlockBlob leaseBlob = lease.getBlob();
        boolean retval = true;
        try {
            String leaseId = lease.getToken();
            AzureBlobLease releasedCopy = new AzureBlobLease(lease);
            releasedCopy.setToken("");
            releasedCopy.setOwner("");
            this.uploadLease(releasedCopy, leaseBlob, AccessCondition.generateLeaseCondition((String)leaseId), UploadActivity.Release);
            leaseBlob.releaseLease(AccessCondition.generateLeaseCondition((String)leaseId));
        }
        catch (StorageException se) {
            if (this.wasLeaseLost(se, lease.getPartitionId())) {
                retval = false;
            }
            throw se;
        }
        return retval;
    }

    @Override
    public Future<Boolean> updateLease(Lease lease) {
        return EventProcessorHost.getExecutorService().submit(() -> this.updateLeaseSync((AzureBlobLease)lease));
    }

    public Boolean updateLeaseSync(AzureBlobLease lease) throws Exception {
        if (lease == null) {
            return false;
        }
        this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Updating lease");
        String token = lease.getToken();
        if (token == null || token.length() == 0) {
            return false;
        }
        if (!this.renewLeaseSync(lease).booleanValue()) {
            return false;
        }
        CloudBlockBlob leaseBlob = lease.getBlob();
        try {
            this.uploadLease(lease, leaseBlob, AccessCondition.generateLeaseCondition((String)token), UploadActivity.Update);
        }
        catch (StorageException se) {
            if (this.wasLeaseLost(se, lease.getPartitionId())) {
                throw new LeaseLostException(lease, (Throwable)se);
            }
            throw se;
        }
        return true;
    }

    private AzureBlobLease downloadLease(CloudBlockBlob blob) throws StorageException, IOException {
        String jsonLease = blob.downloadText();
        this.host.logWithHost(Level.FINEST, "Raw JSON downloaded: " + jsonLease);
        AzureBlobLease rehydrated = (AzureBlobLease)this.gson.fromJson(jsonLease, AzureBlobLease.class);
        AzureBlobLease blobLease = new AzureBlobLease(rehydrated, blob);
        if (blobLease.getOffset() != null) {
            this.latestCheckpoint.put(blobLease.getPartitionId(), blobLease.getCheckpoint());
        }
        return blobLease;
    }

    private void uploadLease(AzureBlobLease lease, CloudBlockBlob blob, AccessCondition condition, UploadActivity activity) throws StorageException, IOException {
        if (activity != UploadActivity.Create) {
            Checkpoint cached = this.latestCheckpoint.get(lease.getPartitionId());
            if (cached != null && (cached.getSequenceNumber() > lease.getSequenceNumber() || lease.getOffset() == null)) {
                lease.setOffset(cached.getOffset());
                lease.setSequenceNumber(cached.getSequenceNumber());
                this.host.logWithHostAndPartition(Level.FINEST, lease.getPartitionId(), "Replacing stale offset/seqno while uploading lease");
            } else if (lease.getOffset() != null) {
                this.latestCheckpoint.put(lease.getPartitionId(), lease.getCheckpoint());
            }
        }
        String jsonLease = this.gson.toJson((Object)lease);
        blob.uploadText(jsonLease, null, condition, null, null);
        this.host.logWithHostAndPartition(Level.FINEST, lease.getPartitionId(), "Raw JSON uploading for " + (Object)((Object)activity) + ": " + jsonLease);
    }

    private boolean wasLeaseLost(StorageException se, String partitionId) {
        StorageExtendedErrorInformation extendedErrorInfo;
        boolean retval = false;
        this.host.logWithHostAndPartition(Level.FINER, partitionId, "WAS LEASE LOST?");
        this.host.logWithHostAndPartition(Level.FINER, partitionId, "Http " + se.getHttpStatusCode());
        if (se.getExtendedErrorInformation() != null) {
            this.host.logWithHostAndPartition(Level.FINER, partitionId, "Http " + se.getExtendedErrorInformation().getErrorCode() + " :: " + se.getExtendedErrorInformation().getErrorMessage());
        }
        if ((se.getHttpStatusCode() == 409 || se.getHttpStatusCode() == 412) && (extendedErrorInfo = se.getExtendedErrorInformation()) != null) {
            String errorCode = extendedErrorInfo.getErrorCode();
            this.host.logWithHostAndPartition(Level.FINER, partitionId, "Error code: " + errorCode);
            this.host.logWithHostAndPartition(Level.FINER, partitionId, "Error message: " + extendedErrorInfo.getErrorMessage());
            if (errorCode.compareTo("LeaseLost") == 0 || errorCode.compareTo("LeaseIdMismatchWithLeaseOperation") == 0 || errorCode.compareTo("LeaseIdMismatchWithBlobOperation") == 0 || errorCode.compareTo("LeaseAlreadyPresent") == 0) {
                retval = true;
            }
        }
        return retval;
    }

    private static enum UploadActivity {
        Create,
        Acquire,
        Release,
        Update;

    }
}

