/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.kusto.ingest;

import com.azure.core.http.HttpClient;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.Utils;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.ThrottleException;
import com.microsoft.azure.kusto.data.http.HttpClientFactory;
import com.microsoft.azure.kusto.data.http.HttpClientProperties;
import com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity;
import com.microsoft.azure.kusto.ingest.IngestionResourceManager;
import com.microsoft.azure.kusto.ingest.ResourceAlgorithms;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.resources.ContainerWithSas;
import com.microsoft.azure.kusto.ingest.resources.QueueWithSas;
import com.microsoft.azure.kusto.ingest.resources.RankedStorageAccount;
import com.microsoft.azure.kusto.ingest.resources.RankedStorageAccountSet;
import com.microsoft.azure.kusto.ingest.resources.ResourceWithSas;
import com.microsoft.azure.kusto.ingest.utils.TableWithSas;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.vavr.CheckedFunction0;
import java.io.Closeable;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.util.annotation.Nullable;

class ResourceManager
implements Closeable,
IngestionResourceManager {
    private static final long REFRESH_INGESTION_RESOURCES_PERIOD = TimeUnit.HOURS.toMillis(1L);
    private static final long REFRESH_INGESTION_RESOURCES_PERIOD_ON_FAILURE = TimeUnit.MINUTES.toMillis(1L);
    private static final long REFRESH_RESULT_POLL_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(15L);
    private final Client client;
    private final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private Timer refreshTasksTimer;
    private final ReadWriteLock ingestionResourcesLock = new ReentrantReadWriteLock();
    private final ReadWriteLock ingestionAuthTokenLock = new ReentrantReadWriteLock();
    private final ReadWriteLock ingestionResourcesSchedulingLock = new ReentrantReadWriteLock();
    private final ReadWriteLock ingestionAuthTokenSchedulingLock = new ReentrantReadWriteLock();
    private final Long defaultRefreshTime;
    private final Long refreshTimeOnFailure;
    private final HttpClient httpClient;
    private final RetryConfig taskRetryConfig;
    private RequestRetryOptions queueRequestOptions = null;
    private RankedStorageAccountSet storageAccountSet;
    private String identityToken;
    private IngestionResourceSet ingestionResourceSet;
    protected RefreshIngestionAuthTokenTask refreshIngestionAuthTokenTask;
    protected RefreshIngestionResourcesTask refreshIngestionResourcesTask;

    public ResourceManager(Client client, long defaultRefreshTime, long refreshTimeOnFailure, @Nullable HttpClient httpClient) {
        this.client = client;
        this.httpClient = httpClient == null ? HttpClientFactory.create((HttpClientProperties)HttpClientProperties.builder().build()) : httpClient;
        this.refreshTasksTimer = new Timer(true);
        this.defaultRefreshTime = defaultRefreshTime;
        this.refreshTimeOnFailure = refreshTimeOnFailure;
        this.taskRetryConfig = Utils.buildRetryConfig((Class[])new Class[]{ThrottleException.class});
        this.initRefreshTasks();
        this.storageAccountSet = new RankedStorageAccountSet();
    }

    public ResourceManager(Client client, @Nullable HttpClient httpClient) {
        this(client, REFRESH_INGESTION_RESOURCES_PERIOD, REFRESH_INGESTION_RESOURCES_PERIOD_ON_FAILURE, httpClient);
    }

    @Override
    public void close() {
        this.refreshTasksTimer.cancel();
        this.refreshTasksTimer.purge();
        this.refreshTasksTimer = null;
    }

    private void initRefreshTasks() {
        this.scheduleRefreshIngestionResourcesTask(0L);
        this.scheduleRefreshIngestionAuthTokenTask(0L);
    }

    private synchronized void scheduleRefreshIngestionResourcesTask(Long delay) {
        if (this.refreshTasksTimer != null) {
            if (this.refreshIngestionResourcesTask != null) {
                this.refreshIngestionResourcesTask.cancel();
            }
            this.refreshIngestionResourcesTask = new RefreshIngestionResourcesTask();
            this.refreshTasksTimer.schedule((TimerTask)this.refreshIngestionResourcesTask, delay, (long)this.defaultRefreshTime);
        }
    }

    private synchronized void scheduleRefreshIngestionAuthTokenTask(Long delay) {
        if (this.refreshTasksTimer != null) {
            if (this.refreshIngestionAuthTokenTask != null) {
                this.refreshIngestionAuthTokenTask.cancel();
            }
            this.refreshIngestionAuthTokenTask = new RefreshIngestionAuthTokenTask();
            this.refreshTasksTimer.schedule((TimerTask)this.refreshIngestionAuthTokenTask, delay, (long)this.defaultRefreshTime);
        }
    }

    @Override
    public List<ContainerWithSas> getShuffledContainers() throws IngestionServiceException {
        IngestionResource containers = this.getResourceSet(() -> this.ingestionResourceSet.containers);
        return ResourceAlgorithms.getShuffledResources(this.storageAccountSet.getRankedShuffledAccounts(), containers.getResourcesList());
    }

    public List<QueueWithSas> getShuffledQueues() throws IngestionServiceException {
        IngestionResource queues = this.getResourceSet(() -> this.ingestionResourceSet.queues);
        return ResourceAlgorithms.getShuffledResources(this.storageAccountSet.getRankedShuffledAccounts(), queues.getResourcesList());
    }

    public TableWithSas getStatusTable() throws IngestionServiceException {
        return (TableWithSas)this.getResource(() -> this.ingestionResourceSet.statusTable);
    }

    public QueueWithSas getFailedQueue() throws IngestionServiceException {
        return (QueueWithSas)this.getResource(() -> this.ingestionResourceSet.failedIngestionsQueues);
    }

    public QueueWithSas getSuccessfulQueue() throws IngestionServiceException {
        return (QueueWithSas)this.getResource(() -> this.ingestionResourceSet.successfulIngestionsQueues);
    }

    public String getIdentityToken() throws IngestionServiceException {
        if (this.identityToken == null) {
            if (this.ingestionAuthTokenSchedulingLock.writeLock().tryLock()) {
                try {
                    this.scheduleRefreshIngestionAuthTokenTask(0L);
                }
                finally {
                    this.ingestionAuthTokenSchedulingLock.writeLock().unlock();
                }
            }
            Boolean refreshedOnce = this.refreshIngestionAuthTokenTask.waitUntilRefreshedAtLeastOnce();
            if (this.identityToken == null) {
                ResourceManager.throwNoResultException("Unable to get Identity token", refreshedOnce);
            }
        }
        return this.identityToken;
    }

    public void setQueueRequestOptions(RequestRetryOptions queueRequestOptions) {
        this.queueRequestOptions = queueRequestOptions;
    }

    private <T> T getResource(Callable<IngestionResource<T>> resourceGetter) throws IngestionServiceException {
        return this.getResourceSet(resourceGetter).nextResource();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> IngestionResource<T> getResourceSet(Callable<IngestionResource<T>> resourceGetter) throws IngestionServiceException {
        IngestionResource<T> resource = null;
        try {
            resource = resourceGetter.call();
        }
        catch (Exception exception) {
            // empty catch block
        }
        if (resource == null || resource.empty()) {
            if (this.ingestionResourcesSchedulingLock.writeLock().tryLock()) {
                try {
                    this.scheduleRefreshIngestionResourcesTask(0L);
                }
                finally {
                    this.ingestionResourcesSchedulingLock.writeLock().unlock();
                }
            }
            Boolean refreshedOnce = this.refreshIngestionResourcesTask.waitUntilRefreshedAtLeastOnce();
            try {
                resource = resourceGetter.call();
            }
            catch (Exception exception) {
                // empty catch block
            }
            if (resource == null || resource.empty()) {
                ResourceManager.throwNoResultException("Unable to get ingestion resources for this type: " + (resource == null ? "" : resource.resourceType), refreshedOnce);
            }
        }
        return resource;
    }

    private static void throwNoResultException(String baseMessage, Boolean refreshedOnce) throws IngestionServiceException {
        if (refreshedOnce == null) {
            baseMessage = baseMessage + " because thread checking refresh job timed out or was interrupted";
        } else if (!refreshedOnce.booleanValue()) {
            baseMessage = baseMessage + " because refresh job failed";
        }
        throw new IngestionServiceException(baseMessage);
    }

    @Override
    public void reportIngestionResult(ResourceWithSas<?> resource, boolean success) {
        if (this.storageAccountSet == null) {
            this.log.warn("StorageAccountSet is null");
            return;
        }
        this.storageAccountSet.addResultToAccount(resource.getAccountName(), success);
    }

    private static class IngestionResourceSet {
        IngestionResource<ContainerWithSas> containers = new IngestionResource(ResourceType.TEMP_STORAGE);
        IngestionResource<TableWithSas> statusTable = new IngestionResource(ResourceType.INGESTIONS_STATUS_TABLE);
        IngestionResource<QueueWithSas> queues = new IngestionResource(ResourceType.SECURED_READY_FOR_AGGREGATION_QUEUE);
        IngestionResource<QueueWithSas> successfulIngestionsQueues = new IngestionResource(ResourceType.SUCCESSFUL_INGESTIONS_QUEUE);
        IngestionResource<QueueWithSas> failedIngestionsQueues = new IngestionResource(ResourceType.FAILED_INGESTIONS_QUEUE);

        private IngestionResourceSet() {
        }
    }

    private static class IngestionResource<T> {
        final ResourceType resourceType;
        int roundRobinIdx = 0;
        List<T> resourcesList;

        IngestionResource(ResourceType resourceType) {
            this.resourceType = resourceType;
            this.resourcesList = new ArrayList<T>();
        }

        public List<T> getResourcesList() {
            return this.resourcesList;
        }

        void addResource(T resource) {
            this.resourcesList.add(resource);
        }

        T nextResource() {
            this.roundRobinIdx = (this.roundRobinIdx + 1) % this.resourcesList.size();
            return this.resourcesList.get(this.roundRobinIdx);
        }

        boolean empty() {
            return this.resourcesList.isEmpty();
        }
    }

    static enum ResourceType {
        SECURED_READY_FOR_AGGREGATION_QUEUE("SecuredReadyForAggregationQueue"),
        FAILED_INGESTIONS_QUEUE("FailedIngestionsQueue"),
        SUCCESSFUL_INGESTIONS_QUEUE("SuccessfulIngestionsQueue"),
        TEMP_STORAGE("TempStorage"),
        INGESTIONS_STATUS_TABLE("IngestionsStatusTable");

        private final String resourceTypeName;

        private ResourceType(String resourceTypeName) {
            this.resourceTypeName = resourceTypeName;
        }

        public static ResourceType findByResourceTypeName(String resourceTypeName) {
            for (ResourceType resourceType : ResourceType.values()) {
                if (!resourceType.resourceTypeName.equalsIgnoreCase(resourceTypeName)) continue;
                return resourceType;
            }
            throw new IllegalArgumentException(resourceTypeName);
        }

        String getResourceTypeName() {
            return this.resourceTypeName;
        }
    }

    class RefreshIngestionAuthTokenTask
    extends RefreshResourceTask {
        RefreshIngestionAuthTokenTask() {
        }

        @Override
        public void run() {
            try {
                MonitoredActivity.invoke(() -> {
                    this.refreshIngestionAuthToken();
                    return null;
                }, (String)"ResourceManager.refreshIngestionAuthToken");
            }
            catch (Exception e) {
                ResourceManager.this.log.error("Error in refreshIngestionAuthToken: " + e.getMessage(), (Throwable)e);
                ResourceManager.this.scheduleRefreshIngestionAuthTokenTask(ResourceManager.this.refreshTimeOnFailure);
            }
        }

        private void refreshIngestionAuthToken() throws IngestionClientException, IngestionServiceException {
            if (ResourceManager.this.ingestionAuthTokenLock.writeLock().tryLock()) {
                try {
                    ResourceManager.this.log.info("Refreshing Ingestion Auth Token");
                    Retry retry = Retry.of((String)"get Ingestion Auth Token resources", (RetryConfig)ResourceManager.this.taskRetryConfig);
                    CheckedFunction0 retryExecute = Retry.decorateCheckedSupplier((Retry)retry, (CheckedFunction0 & Serializable)() -> ResourceManager.this.client.executeMgmt(".get kusto identity token"));
                    KustoOperationResult identityTokenResult = (KustoOperationResult)retryExecute.apply();
                    if (identityTokenResult != null && identityTokenResult.hasNext() && !identityTokenResult.getResultTables().isEmpty()) {
                        KustoResultSetTable resultTable = identityTokenResult.next();
                        resultTable.next();
                        ResourceManager.this.identityToken = resultTable.getString(0);
                    }
                    this.refreshedAtLeastOnce.clear();
                    this.refreshedAtLeastOnce.put(true);
                    ResourceManager.this.log.info("Refreshing Ingestion Auth Token Finished");
                }
                catch (DataServiceException e) {
                    throw new IngestionServiceException(e.getIngestionSource(), "Error refreshing IngestionAuthToken. " + e.getMessage(), (Exception)((Object)e));
                }
                catch (DataClientException e) {
                    throw new IngestionClientException(e.getIngestionSource(), "Error refreshing IngestionAuthToken. " + e.getMessage(), (Exception)((Object)e));
                }
                catch (Throwable e) {
                    throw new IngestionClientException(e.getMessage(), e);
                }
                finally {
                    ResourceManager.this.ingestionAuthTokenLock.writeLock().unlock();
                }
            }
        }
    }

    class RefreshIngestionResourcesTask
    extends RefreshResourceTask {
        RefreshIngestionResourcesTask() {
        }

        @Override
        public void run() {
            try {
                MonitoredActivity.invoke(() -> {
                    this.refreshIngestionResources();
                    return null;
                }, (String)"ResourceManager.refreshIngestionResource");
            }
            catch (Exception e) {
                ResourceManager.this.log.error("Error in refreshIngestionResources: " + e.getMessage(), (Throwable)e);
                ResourceManager.this.scheduleRefreshIngestionResourcesTask(ResourceManager.this.refreshTimeOnFailure);
            }
        }

        private void refreshIngestionResources() throws IngestionClientException, IngestionServiceException {
            if (ResourceManager.this.ingestionResourcesLock.writeLock().tryLock()) {
                try {
                    ResourceManager.this.log.info("Refreshing Ingestion Resources");
                    IngestionResourceSet newIngestionResourceSet = new IngestionResourceSet();
                    Retry retry = Retry.of((String)"get ingestion resources", (RetryConfig)ResourceManager.this.taskRetryConfig);
                    CheckedFunction0 retryExecute = Retry.decorateCheckedSupplier((Retry)retry, (CheckedFunction0 & Serializable)() -> ResourceManager.this.client.executeMgmt(".show ingestion resources"));
                    KustoOperationResult ingestionResourcesResults = (KustoOperationResult)retryExecute.apply();
                    if (ingestionResourcesResults != null) {
                        KustoResultSetTable table = ingestionResourcesResults.getPrimaryResults();
                        while (table.next()) {
                            String resourceTypeName = table.getString(0);
                            String storageUrl = table.getString(1);
                            this.addIngestionResource(newIngestionResourceSet, resourceTypeName, storageUrl);
                        }
                    }
                    this.populateStorageAccounts(newIngestionResourceSet);
                    ResourceManager.this.ingestionResourceSet = newIngestionResourceSet;
                    this.refreshedAtLeastOnce.clear();
                    this.refreshedAtLeastOnce.put(true);
                    ResourceManager.this.log.info("Refreshing Ingestion Resources Finished");
                }
                catch (DataServiceException e) {
                    throw new IngestionServiceException(e.getIngestionSource(), "Error refreshing IngestionResources. " + e.getMessage(), (Exception)((Object)e));
                }
                catch (DataClientException e) {
                    throw new IngestionClientException(e.getIngestionSource(), "Error refreshing IngestionResources. " + e.getMessage(), (Exception)((Object)e));
                }
                catch (Throwable e) {
                    throw new IngestionClientException(e.getMessage(), e);
                }
                finally {
                    ResourceManager.this.ingestionResourcesLock.writeLock().unlock();
                }
            }
        }

        private void addIngestionResource(IngestionResourceSet ingestionResourceSet, String resourceTypeName, String storageUrl) throws URISyntaxException {
            ResourceType resourceType = ResourceType.findByResourceTypeName(resourceTypeName);
            switch (resourceType) {
                case TEMP_STORAGE: {
                    ingestionResourceSet.containers.addResource(new ContainerWithSas(storageUrl, ResourceManager.this.httpClient));
                    break;
                }
                case INGESTIONS_STATUS_TABLE: {
                    ingestionResourceSet.statusTable.addResource(new TableWithSas(storageUrl, ResourceManager.this.httpClient));
                    break;
                }
                case SECURED_READY_FOR_AGGREGATION_QUEUE: {
                    ingestionResourceSet.queues.addResource(new QueueWithSas(storageUrl, ResourceManager.this.httpClient, ResourceManager.this.queueRequestOptions));
                    break;
                }
                case SUCCESSFUL_INGESTIONS_QUEUE: {
                    ingestionResourceSet.successfulIngestionsQueues.addResource(new QueueWithSas(storageUrl, ResourceManager.this.httpClient, ResourceManager.this.queueRequestOptions));
                    break;
                }
                case FAILED_INGESTIONS_QUEUE: {
                    ingestionResourceSet.failedIngestionsQueues.addResource(new QueueWithSas(storageUrl, ResourceManager.this.httpClient, ResourceManager.this.queueRequestOptions));
                    break;
                }
                default: {
                    throw new IllegalStateException("Unexpected value: " + (Object)((Object)resourceType));
                }
            }
        }

        private void populateStorageAccounts(IngestionResourceSet ingestionResourceSet) {
            RankedStorageAccountSet tempAccount = new RankedStorageAccountSet();
            Stream queueStream = ingestionResourceSet.queues == null ? Stream.empty() : ingestionResourceSet.queues.getResourcesList().stream();
            Stream containerStream = ingestionResourceSet.containers == null ? Stream.empty() : ingestionResourceSet.containers.getResourcesList().stream();
            Stream.concat(queueStream, containerStream).forEach(resource -> {
                String accountName = resource.getAccountName();
                if (tempAccount.getAccount(accountName) != null) {
                    return;
                }
                RankedStorageAccount previousAccount = ResourceManager.this.storageAccountSet.getAccount(accountName);
                if (previousAccount != null) {
                    tempAccount.addAccount(previousAccount);
                } else {
                    tempAccount.addAccount(accountName);
                }
            });
            ResourceManager.this.storageAccountSet = tempAccount;
        }
    }

    static abstract class RefreshResourceTask
    extends TimerTask {
        protected final BlockingQueue<Boolean> refreshedAtLeastOnce = new LinkedBlockingDeque<Boolean>();

        RefreshResourceTask() {
        }

        public Boolean waitUntilRefreshedAtLeastOnce() {
            return this.waitUntilRefreshedAtLeastOnce(REFRESH_RESULT_POLL_TIMEOUT_MILLIS);
        }

        public Boolean waitUntilRefreshedAtLeastOnce(long timeoutMillis) {
            try {
                Boolean refreshedAtLeastOncePollResult = this.refreshedAtLeastOnce.poll(timeoutMillis, TimeUnit.MILLISECONDS);
                if (refreshedAtLeastOncePollResult != null) {
                    this.refreshedAtLeastOnce.put(refreshedAtLeastOncePollResult);
                    return refreshedAtLeastOncePollResult;
                }
                return null;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
    }
}

