/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.ingest.streaming.internal;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import net.snowflake.client.core.ExecTimeTelemetryData;
import net.snowflake.client.core.HttpClientSettingsKey;
import net.snowflake.client.core.HttpUtil;
import net.snowflake.client.core.OCSPMode;
import net.snowflake.client.jdbc.RestRequest;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.client.jdbc.SnowflakeUtil;
import net.snowflake.client.jdbc.cloud.storage.SnowflakeStorageClient;
import net.snowflake.client.jdbc.cloud.storage.StageInfo;
import net.snowflake.client.jdbc.cloud.storage.StorageClientFactory;
import net.snowflake.client.jdbc.internal.apache.http.HttpEntity;
import net.snowflake.client.jdbc.internal.apache.http.client.HttpResponseException;
import net.snowflake.client.jdbc.internal.apache.http.client.methods.CloseableHttpResponse;
import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpPut;
import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpRequestBase;
import net.snowflake.client.jdbc.internal.apache.http.client.utils.URIBuilder;
import net.snowflake.client.jdbc.internal.apache.http.entity.ByteArrayEntity;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.client.jdbc.internal.apache.http.util.EntityUtils;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException;
import net.snowflake.client.jdbc.internal.google.api.client.http.HttpStatusCodes;
import net.snowflake.ingest.connection.IngestResponseException;
import net.snowflake.ingest.streaming.internal.BlobPath;
import net.snowflake.ingest.streaming.internal.FileLocationInfo;
import net.snowflake.ingest.streaming.internal.GeneratePresignedUrlsRequest;
import net.snowflake.ingest.streaming.internal.GeneratePresignedUrlsResponse;
import net.snowflake.ingest.streaming.internal.IStorage;
import net.snowflake.ingest.streaming.internal.InternalStage;
import net.snowflake.ingest.streaming.internal.SnowflakeFileTransferMetadataWithAge;
import net.snowflake.ingest.streaming.internal.SnowflakeServiceClient;
import net.snowflake.ingest.streaming.internal.TableRef;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.SFException;

class ExternalVolume
implements IStorage {
    private static final Logging logger = new Logging(ExternalVolume.class);
    private static final int DEFAULT_PRESIGNED_URL_COUNT = 10;
    private static final int DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS = 900;
    private static final int MAX_CONCURRENT_GENERATE_URLS_REQUESTS = 10;
    private static final int LOW_WATERMARK_FOR_EARLY_REFRESH = 5;
    private final String clientName;
    private final String clientPrefix;
    private final Long deploymentId;
    private final String role;
    private final TableRef tableRef;
    private final SnowflakeServiceClient serviceClient;
    private final Semaphore generateUrlsSemaphore;
    private final ConcurrentLinkedQueue<GeneratePresignedUrlsResponse.PresignedUrlInfo> presignedUrlInfos;
    private final AtomicInteger numUrlsInQueue;
    private final FileLocationInfo locationInfo;
    private final SnowflakeFileTransferMetadataWithAge fileTransferMetadata;

    ExternalVolume(String clientName, String clientPrefix, Long deploymentId, String role, TableRef tableRef, FileLocationInfo locationInfo, SnowflakeServiceClient serviceClient) {
        this.clientName = clientName;
        this.clientPrefix = clientPrefix;
        this.deploymentId = deploymentId;
        this.role = role;
        this.tableRef = tableRef;
        this.serviceClient = serviceClient;
        this.locationInfo = locationInfo;
        this.presignedUrlInfos = new ConcurrentLinkedQueue();
        this.numUrlsInQueue = new AtomicInteger(0);
        this.generateUrlsSemaphore = new Semaphore(10);
        if (this.locationInfo.getIsClientSideEncrypted()) {
            throw new SFException(ErrorCode.INTERNAL_ERROR, "Cannot ingest into an external volume that requests client side encryption");
        }
        if ("S3".equalsIgnoreCase(this.locationInfo.getLocationType())) {
            this.locationInfo.getCredentials().put("AWS_KEY_ID", "key");
            this.locationInfo.getCredentials().put("AWS_SECRET_KEY", "secret");
        }
        try {
            this.fileTransferMetadata = InternalStage.createFileTransferMetadataWithAge(this.locationInfo);
        }
        catch (com.fasterxml.jackson.core.JsonProcessingException | SnowflakeSQLException | JsonProcessingException e) {
            throw new SFException(e, ErrorCode.INTERNAL_ERROR, new Object[0]);
        }
        this.generateUrls(5, false);
    }

    @Override
    public void put(BlobPath blobPath, byte[] blob) {
        if (this.fileTransferMetadata.isLocalFS) {
            InternalStage.putLocal(this.fileTransferMetadata.localLocation, blobPath.fileName, blob);
            return;
        }
        try {
            this.putRemote(blobPath.blobPath, blob);
        }
        catch (Throwable e) {
            throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE, new Object[0]);
        }
    }

    private void putRemote(String blobPath, byte[] blob) throws SnowflakeSQLException, URISyntaxException, IOException {
        Properties proxyProperties = net.snowflake.ingest.utils.HttpUtil.generateProxyPropertiesForJDBC();
        HttpClientSettingsKey key = SnowflakeUtil.convertProxyPropertiesToHttpClientKey((OCSPMode)OCSPMode.FAIL_OPEN, (Properties)proxyProperties);
        StageInfo stageInfo = this.fileTransferMetadata.fileTransferMetadata.getStageInfo();
        SnowflakeStorageClient client = StorageClientFactory.getFactory().createClient(stageInfo, 1, null, null);
        URIBuilder uriBuilder = new URIBuilder(blobPath);
        HttpPut httpRequest = new HttpPut(uriBuilder.build());
        httpRequest.setEntity((HttpEntity)new ByteArrayEntity(blob));
        this.addHeadersToHttpRequest(httpRequest, blob, stageInfo, client);
        if (stageInfo.getStageType().equals((Object)StageInfo.StageType.AZURE)) {
            throw new SFException(ErrorCode.INTERNAL_ERROR, "Azure based external volumes are not yet supported.");
        }
        CloseableHttpClient httpClient = HttpUtil.getHttpClient((HttpClientSettingsKey)key);
        CloseableHttpResponse response = RestRequest.execute((CloseableHttpClient)httpClient, (HttpRequestBase)httpRequest, (long)0L, (long)0L, (int)((int)HttpUtil.getSocketTimeout().toMillis()), (int)1, (int)0, null, (boolean)false, (boolean)false, (boolean)false, (boolean)true, (boolean)true, (ExecTimeTelemetryData)new ExecTimeTelemetryData());
        int statusCode = response.getStatusLine().getStatusCode();
        if (!HttpStatusCodes.isSuccess((int)statusCode)) {
            HttpResponseException ex = new HttpResponseException(response.getStatusLine().getStatusCode(), String.format("%s, body: %s", response.getStatusLine().getReasonPhrase(), EntityUtils.toString((HttpEntity)response.getEntity())));
            client.handleStorageException((Exception)ex, 0, "upload", null, null, null);
        }
    }

    private void addHeadersToHttpRequest(HttpPut httpRequest, byte[] blob, StageInfo stageInfo, SnowflakeStorageClient client) {
    }

    GeneratePresignedUrlsResponse.PresignedUrlInfo dequeueUrlInfo() {
        int remainingUrlsInQueue;
        GeneratePresignedUrlsResponse.PresignedUrlInfo info;
        long validUntilAtleastTimestamp = System.currentTimeMillis() + 60000L;
        while (true) {
            if ((info = this.presignedUrlInfos.poll()) == null) {
                this.generateUrls(5, true);
                continue;
            }
            remainingUrlsInQueue = this.numUrlsInQueue.decrementAndGet();
            if (info.validUntilTimestamp >= validUntilAtleastTimestamp) break;
        }
        if (remainingUrlsInQueue <= 5) {
            this.generateUrls(5, false);
        }
        return info;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void generateUrls(int minCountToSkipGeneration, boolean waitUntilAcquired) {
        int numAcquireAttempts = 0;
        boolean acquired = false;
        while (!acquired && numAcquireAttempts++ < 300) {
            try {
                int timeoutInSeconds = waitUntilAcquired ? 1 : 0;
                acquired = this.generateUrlsSemaphore.tryAcquire(timeoutInSeconds, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                boolean interrupted = Thread.interrupted();
                String message = String.format("Semaphore acquisition in ExternalVolume.generateUrls was interrupted, likely because the process is shutting down. TableRef=%s Thread.interrupted=%s", this.tableRef, interrupted);
                logger.logError(message);
                throw new SFException(ErrorCode.INTERNAL_ERROR, message);
            }
            if (this.numUrlsInQueue.get() >= minCountToSkipGeneration) {
                if (acquired) {
                    this.generateUrlsSemaphore.release();
                }
                return;
            }
            if (acquired || waitUntilAcquired) continue;
            logger.logDebug("Skipping generateUrls because semaphore acquisition failed AND waitUntilAcquired == false.");
            return;
        }
        if (!acquired) {
            String message = String.format("Could not acquire semaphore to generate URLs. TableRef=%s", this.tableRef);
            logger.logError(message);
            throw new SFException(ErrorCode.INTERNAL_ERROR, message);
        }
        try {
            long currentTimestamp = System.currentTimeMillis();
            long validUntilTimestamp = currentTimestamp + 900000L;
            GeneratePresignedUrlsResponse response = this.doGenerateUrls(900);
            List<GeneratePresignedUrlsResponse.PresignedUrlInfo> urlInfos = response.getPresignedUrlInfos();
            urlInfos = urlInfos.stream().map(info -> {
                info.validUntilTimestamp = validUntilTimestamp;
                return info;
            }).filter(info -> {
                if (info == null || info.url == null || info.fileName == null || info.url.isEmpty()) {
                    logger.logError("Received unexpected null or empty URL in externalVolume.generateUrls tableRef=%s", this.tableRef);
                    return false;
                }
                return true;
            }).collect(Collectors.toList());
            this.presignedUrlInfos.addAll(urlInfos);
            this.numUrlsInQueue.addAndGet(urlInfos.size());
        }
        finally {
            this.generateUrlsSemaphore.release();
        }
    }

    private GeneratePresignedUrlsResponse doGenerateUrls(int timeoutInSeconds) {
        try {
            return this.serviceClient.generatePresignedUrls(new GeneratePresignedUrlsRequest(this.tableRef, this.role, 10, timeoutInSeconds, this.deploymentId, true));
        }
        catch (IOException | IngestResponseException e) {
            throw new SFException(e, ErrorCode.GENERATE_PRESIGNED_URLS_FAILURE, e.getMessage());
        }
    }
}

