/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.client.jdbc;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.MappingJsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import net.snowflake.client.core.ChunkDownloader;
import net.snowflake.client.core.DownloaderMetrics;
import net.snowflake.client.core.HttpClientSettingsKey;
import net.snowflake.client.core.HttpUtil;
import net.snowflake.client.core.OCSPMode;
import net.snowflake.client.core.ObjectMapperFactory;
import net.snowflake.client.core.QueryResultFormat;
import net.snowflake.client.core.SFArrowResultSet;
import net.snowflake.client.core.SFBaseSession;
import net.snowflake.client.core.SessionUtil;
import net.snowflake.client.jdbc.ArrowResultChunk;
import net.snowflake.client.jdbc.ChunkDownloadContext;
import net.snowflake.client.jdbc.ErrorCode;
import net.snowflake.client.jdbc.JsonResultChunk;
import net.snowflake.client.jdbc.ResultJsonParserV2;
import net.snowflake.client.jdbc.ResultStreamProvider;
import net.snowflake.client.jdbc.SnowflakeConnectString;
import net.snowflake.client.jdbc.SnowflakeResultChunk;
import net.snowflake.client.jdbc.SnowflakeResultSetSerializableV1;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.client.jdbc.SnowflakeSQLLoggedException;
import net.snowflake.client.jdbc.internal.apache.arrow.memory.RootAllocator;
import net.snowflake.client.jdbc.telemetryOOB.TelemetryService;
import net.snowflake.client.log.SFLogger;
import net.snowflake.client.log.SFLoggerFactory;

public class SnowflakeChunkDownloader
implements ChunkDownloader {
    private static final ObjectMapper mapper = ObjectMapperFactory.getObjectMapper();
    private static final JsonFactory jsonFactory = new MappingJsonFactory();
    private static final SFLogger logger = SFLoggerFactory.getLogger(SnowflakeChunkDownloader.class);
    private static final int STREAM_BUFFER_SIZE = 0x100000;
    private static final long SHUTDOWN_TIME = 3L;
    private final SnowflakeConnectString snowflakeConnectionString;
    private final OCSPMode ocspMode;
    private final HttpClientSettingsKey ocspModeAndProxyKey;
    private SFBaseSession session;
    private JsonResultChunk.ResultChunkDataCache chunkDataCache = new JsonResultChunk.ResultChunkDataCache();
    private List<SnowflakeResultChunk> chunks;
    private int nextChunkToConsume = 0;
    private int nextChunkToDownload = 0;
    private final int prefetchSlots;
    private final ThreadPoolExecutor executor;
    private long numberMillisWaitingForChunks = 0L;
    private final AtomicBoolean terminated = new AtomicBoolean(false);
    private final AtomicLong totalMillisDownloadingChunks = new AtomicLong(0L);
    private final AtomicLong totalMillisParsingChunks = new AtomicLong(0L);
    private final String qrmk;
    private Map<String, String> chunkHeadersMap;
    private final int networkTimeoutInMilli;
    private final int authTimeout;
    private final int socketTimeout;
    private final int maxHttpRetries;
    private long memoryLimit;
    private static final AtomicLong currentMemoryUsage = new AtomicLong();
    private Map<Integer, Future> downloaderFutures = new ConcurrentHashMap<Integer, Future>();
    private QueryResultFormat queryResultFormat;
    private RootAllocator rootAllocator;
    private final String queryId;
    private final int firstChunkRowCount;
    private long BASE_WAITING_MS = 50L;
    private long WAITING_SECS_MULTIPLIER = 2L;
    private long MAX_WAITING_MS = 30000L;
    private long WAITING_JITTER_RATIO = 10L;
    private final ResultStreamProvider resultStreamProvider;
    private static final long downloadedConditionTimeoutInSeconds = HttpUtil.getDownloadedConditionTimeoutInSeconds();
    private static final int MAX_RETRY_JITTER = 1000;
    private int prefetchMaxRetry = 1;
    private static Throwable injectedDownloaderException = null;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static long getCurrentMemoryUsage() {
        AtomicLong atomicLong = currentMemoryUsage;
        synchronized (atomicLong) {
            return currentMemoryUsage.longValue();
        }
    }

    static void setInjectedDownloaderException(Throwable th) {
        injectedDownloaderException = th;
    }

    public OCSPMode getOCSPMode() {
        return this.ocspMode;
    }

    public HttpClientSettingsKey getHttpClientSettingsKey() {
        return this.ocspModeAndProxyKey;
    }

    public ResultStreamProvider getResultStreamProvider() {
        return this.resultStreamProvider;
    }

    private static ThreadPoolExecutor createChunkDownloaderExecutorService(final String threadNamePrefix, int parallel) {
        ThreadFactory threadFactory = new ThreadFactory(){
            private int threadCount = 1;

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName(threadNamePrefix + this.threadCount++);
                thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

                    @Override
                    public void uncaughtException(Thread t, Throwable e) {
                        logger.error("Uncaught Exception in thread {}: {}", t, e);
                    }
                });
                thread.setDaemon(true);
                return thread;
            }
        };
        return (ThreadPoolExecutor)Executors.newFixedThreadPool(parallel, threadFactory);
    }

    public SnowflakeChunkDownloader(SnowflakeResultSetSerializableV1 resultSetSerializable) throws SnowflakeSQLException {
        Object prefetchMaxRetry;
        this.queryId = resultSetSerializable.getQueryId();
        this.firstChunkRowCount = resultSetSerializable.getFirstChunkRowCount();
        this.snowflakeConnectionString = resultSetSerializable.getSnowflakeConnectString();
        this.ocspMode = resultSetSerializable.getOCSPMode();
        this.ocspModeAndProxyKey = resultSetSerializable.getHttpClientKey();
        this.qrmk = resultSetSerializable.getQrmk();
        this.networkTimeoutInMilli = resultSetSerializable.getNetworkTimeoutInMilli();
        this.authTimeout = resultSetSerializable.getAuthTimeout();
        this.socketTimeout = resultSetSerializable.getSocketTimeout();
        this.maxHttpRetries = resultSetSerializable.getMaxHttpRetries();
        this.prefetchSlots = resultSetSerializable.getResultPrefetchThreads() * 2;
        this.queryResultFormat = resultSetSerializable.getQueryResultFormat();
        logger.debug("qrmk: {}", this.qrmk);
        this.chunkHeadersMap = resultSetSerializable.getChunkHeadersMap();
        SFBaseSession sFBaseSession = this.session = resultSetSerializable.getSession() != null ? (SFBaseSession)resultSetSerializable.getSession().orElse(null) : null;
        if (this.session != null && (prefetchMaxRetry = this.session.getOtherParameter("JDBC_CHUNK_DOWNLOADER_MAX_RETRY")) != null) {
            this.prefetchMaxRetry = (Integer)prefetchMaxRetry;
        }
        if (resultSetSerializable.getServerURL() != null) {
            try {
                SessionUtil.resetOCSPUrlIfNecessary(resultSetSerializable.getServerURL());
            }
            catch (IOException e) {
                logger.debug("Exception while resetting OCSP URL", e);
            }
        }
        this.memoryLimit = resultSetSerializable.getMemoryLimit();
        if (this.session != null && this.session.getMemoryLimitForTesting() != SFBaseSession.MEMORY_LIMIT_UNSET) {
            this.memoryLimit = this.session.getMemoryLimitForTesting();
        }
        this.chunks = new ArrayList<SnowflakeResultChunk>(resultSetSerializable.getChunkFileCount());
        this.resultStreamProvider = resultSetSerializable.getResultStreamProvider();
        if (resultSetSerializable.getChunkFileCount() < 1) {
            throw new SnowflakeSQLLoggedException(this.session, ErrorCode.INTERNAL_ERROR, "Incorrect chunk count: " + resultSetSerializable.getChunkFileCount());
        }
        for (SnowflakeResultSetSerializableV1.ChunkFileMetadata chunkFileMetadata : resultSetSerializable.getChunkFileMetadatas()) {
            SnowflakeResultChunk chunk;
            switch (this.queryResultFormat) {
                case ARROW: {
                    this.rootAllocator = resultSetSerializable.getRootAllocator();
                    chunk = new ArrowResultChunk(chunkFileMetadata.getFileURL(), chunkFileMetadata.getRowCount(), resultSetSerializable.getColumnCount(), chunkFileMetadata.getUncompressedByteSize(), this.rootAllocator, this.session);
                    break;
                }
                case JSON: {
                    chunk = new JsonResultChunk(chunkFileMetadata.getFileURL(), chunkFileMetadata.getRowCount(), resultSetSerializable.getColumnCount(), chunkFileMetadata.getUncompressedByteSize(), this.session);
                    break;
                }
                default: {
                    throw new SnowflakeSQLLoggedException(this.session, ErrorCode.INTERNAL_ERROR, "Invalid result format: " + this.queryResultFormat.name());
                }
            }
            logger.debug("Add chunk: url: {} rowCount: {} uncompressedSize: {} neededChunkMemory: {}, chunkResultFormat: {}", chunk.getScrubbedUrl(), chunk.getRowCount(), chunk.getUncompressedSize(), chunk.computeNeededChunkMemory(), this.queryResultFormat.name());
            this.chunks.add(chunk);
        }
        int effectiveThreads = Math.min(resultSetSerializable.getResultPrefetchThreads(), resultSetSerializable.getChunkFileCount());
        logger.debug("#chunks: {} #threads: {} #slots: {} -> pool: {}", resultSetSerializable.getChunkFileCount(), resultSetSerializable.getResultPrefetchThreads(), this.prefetchSlots, effectiveThreads);
        this.executor = SnowflakeChunkDownloader.createChunkDownloaderExecutorService("result-chunk-downloader-", effectiveThreads);
        try {
            this.startNextDownloaders();
        }
        catch (OutOfMemoryError outOfMemoryError) {
            this.logOutOfMemoryError();
            StringWriter errors = new StringWriter();
            outOfMemoryError.printStackTrace(new PrintWriter(errors));
            throw new SnowflakeSQLLoggedException(this.session, (int)ErrorCode.INTERNAL_ERROR.getMessageCode(), "XX000", errors);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startNextDownloaders() throws SnowflakeSQLException {
        long waitingTime = this.BASE_WAITING_MS;
        long getPrefetchMemRetry = 0L;
        while (this.nextChunkToDownload - this.nextChunkToConsume < this.prefetchSlots && this.nextChunkToDownload < this.chunks.size()) {
            long curMem;
            SnowflakeResultChunk nextChunk = this.chunks.get(this.nextChunkToDownload);
            long neededChunkMemory = nextChunk.computeNeededChunkMemory();
            if (neededChunkMemory > this.memoryLimit) {
                logger.debug("Thread {}: reset memoryLimit from {} MB to current chunk size {} MB", () -> Thread.currentThread().getId(), () -> this.memoryLimit / 1024L / 1024L, () -> neededChunkMemory / 1024L / 1024L);
                this.memoryLimit = neededChunkMemory;
            }
            if ((curMem = currentMemoryUsage.addAndGet(neededChunkMemory)) > this.memoryLimit && (this.nextChunkToDownload - this.nextChunkToConsume > 0 || this.nextChunkToDownload == 0 && this.nextChunkToConsume == 0)) {
                logger.debug("Not enough memory available for prefetch. Cancel reserved memory. MemoryLimit: {}, curMem: {}, nextChunkToDownload: {}, nextChunkToConsume: {}, retry: {}", this.memoryLimit, curMem, this.nextChunkToDownload, this.nextChunkToConsume, getPrefetchMemRetry);
                currentMemoryUsage.addAndGet(-neededChunkMemory);
                nextChunk.getLock().lock();
                try {
                    nextChunk.setDownloadState(SnowflakeResultChunk.DownloadState.FAILURE);
                    break;
                }
                finally {
                    nextChunk.getLock().unlock();
                }
            }
            if (curMem <= this.memoryLimit) {
                if (this.queryResultFormat == QueryResultFormat.JSON) {
                    ((JsonResultChunk)nextChunk).tryReuse(this.chunkDataCache);
                }
                logger.debug("Thread {}: currentMemoryUsage in MB: {}, nextChunkToDownload: {}, nextChunkToConsume: {}, newReservedMemory in B: {} ", () -> Thread.currentThread().getId(), curMem / 0x100000L, this.nextChunkToDownload, this.nextChunkToConsume, neededChunkMemory);
                logger.debug("Submit chunk #{} for downloading, url: {}", this.nextChunkToDownload, nextChunk.getScrubbedUrl());
                nextChunk.getLock().lock();
                try {
                    if (nextChunk.getDownloadState() != SnowflakeResultChunk.DownloadState.NOT_STARTED) {
                        break;
                    }
                }
                finally {
                    nextChunk.getLock().unlock();
                }
                Future<Void> downloaderFuture = this.executor.submit(SnowflakeChunkDownloader.getDownloadChunkCallable(this, nextChunk, this.qrmk, this.nextChunkToDownload, this.chunkHeadersMap, this.networkTimeoutInMilli, this.authTimeout, this.socketTimeout, this.maxHttpRetries, this.session, this.chunks.size(), this.queryId));
                this.downloaderFutures.put(this.nextChunkToDownload, downloaderFuture);
                ++this.nextChunkToDownload;
                waitingTime = this.BASE_WAITING_MS;
                continue;
            }
            logger.debug("Cancel the reserved memory.", false);
            curMem = currentMemoryUsage.addAndGet(-neededChunkMemory);
            if (getPrefetchMemRetry > (long)this.prefetchMaxRetry) {
                logger.debug("Retry limit for prefetch has been reached. Cancel reserved memory and prefetch attempt.", false);
                nextChunk.getLock().lock();
                try {
                    nextChunk.setDownloadState(SnowflakeResultChunk.DownloadState.FAILURE);
                    break;
                }
                finally {
                    nextChunk.getLock().unlock();
                }
            }
            try {
                waitingTime = (waitingTime *= this.WAITING_SECS_MULTIPLIER) > this.MAX_WAITING_MS ? this.MAX_WAITING_MS : waitingTime;
                long jitter = ThreadLocalRandom.current().nextLong(0L, waitingTime / this.WAITING_JITTER_RATIO);
                waitingTime += jitter;
                ++getPrefetchMemRetry;
                if (logger.isDebugEnabled()) {
                    logger.debug("Thread {} waiting for {} s: currentMemoryUsage in MB: {}, neededChunkMemory in MB: {}, nextChunkToDownload: {}, nextChunkToConsume: {}, retry: {}", () -> Thread.currentThread().getId(), (double)waitingTime / 1000.0, curMem / 0x100000L, neededChunkMemory / 0x100000L, this.nextChunkToDownload, this.nextChunkToConsume, getPrefetchMemRetry);
                }
                Thread.sleep(waitingTime);
            }
            catch (InterruptedException ie) {
                throw new SnowflakeSQLException("XX000", (int)ErrorCode.INTERNAL_ERROR.getMessageCode(), "Waiting SnowflakeChunkDownloader has been interrupted.");
            }
        }
        this.chunkDataCache.clear();
    }

    private void releaseCurrentMemoryUsage(int chunkId, Optional<Long> optionalReleaseSize) {
        long releaseSize;
        long l = releaseSize = optionalReleaseSize.isPresent() ? optionalReleaseSize.get().longValue() : this.chunks.get(chunkId).computeNeededChunkMemory();
        if (releaseSize > 0L && !this.chunks.get(chunkId).isReleased()) {
            long curMem = currentMemoryUsage.addAndGet(-releaseSize);
            logger.debug("Thread {} - currentMemoryUsage in MB: {}, released in MB: {}, chunk: {}, optionalReleaseSize: {}, JVMFreeMem: {}", () -> Thread.currentThread().getId(), () -> curMem / 0x100000L, releaseSize, chunkId, optionalReleaseSize.isPresent(), Runtime.getRuntime().freeMemory());
            this.chunks.get(chunkId).setReleased();
        }
    }

    private void releaseAllChunkMemoryUsage() {
        if (this.chunks == null || this.chunks.size() == 0) {
            return;
        }
        for (int i = 0; i < this.nextChunkToDownload; ++i) {
            this.releaseCurrentMemoryUsage(i, Optional.empty());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SnowflakeResultChunk getNextChunkToConsume() throws InterruptedException, SnowflakeSQLException {
        boolean terminateDownloader;
        SnowflakeResultChunk snowflakeResultChunk;
        if (this.nextChunkToConsume > 0) {
            int prevChunk = this.nextChunkToConsume - 1;
            logger.debug("Free chunk data for chunk #{}", prevChunk);
            long chunkMemUsage = this.chunks.get(prevChunk).computeNeededChunkMemory();
            if (this.queryResultFormat == QueryResultFormat.JSON) {
                if (this.nextChunkToDownload < this.chunks.size()) {
                    this.chunkDataCache.add((JsonResultChunk)this.chunks.get(prevChunk));
                } else {
                    this.chunkDataCache.clear();
                }
            }
            this.chunks.get(prevChunk).freeData();
            this.releaseCurrentMemoryUsage(prevChunk, Optional.of(chunkMemUsage));
        }
        if (this.nextChunkToConsume >= this.chunks.size()) {
            logger.debug("No more chunk", false);
            return null;
        }
        try {
            this.startNextDownloaders();
        }
        catch (OutOfMemoryError outOfMemoryError) {
            this.logOutOfMemoryError();
            StringWriter errors = new StringWriter();
            outOfMemoryError.printStackTrace(new PrintWriter(errors));
            throw new SnowflakeSQLLoggedException(this.session, (int)ErrorCode.INTERNAL_ERROR.getMessageCode(), "XX000", errors);
        }
        SnowflakeResultChunk currentChunk = this.chunks.get(this.nextChunkToConsume);
        if (currentChunk.getDownloadState() == SnowflakeResultChunk.DownloadState.SUCCESS) {
            logger.debug("Chunk #{} is ready to consume", this.nextChunkToConsume);
            ++this.nextChunkToConsume;
            if (this.nextChunkToConsume == this.chunks.size()) {
                this.releaseCurrentMemoryUsage(this.nextChunkToConsume - 1, Optional.empty());
            }
            return currentChunk;
        }
        currentChunk.getLock().lock();
        try {
            logger.debug("Chunk#{} is not ready to consume", this.nextChunkToConsume);
            logger.debug("Consumer get lock to check chunk state", false);
            this.waitForChunkReady(currentChunk);
            if (currentChunk.getDownloadState() == SnowflakeResultChunk.DownloadState.FAILURE) {
                this.releaseAllChunkMemoryUsage();
                logger.error("Downloader encountered error: {}", currentChunk.getDownloadError());
                if (currentChunk.getDownloadError().contains("java.lang.OutOfMemoryError: Java heap space")) {
                    this.logOutOfMemoryError();
                }
                throw new SnowflakeSQLLoggedException(this.session, (int)ErrorCode.INTERNAL_ERROR.getMessageCode(), "XX000", currentChunk.getDownloadError());
            }
            logger.debug("Chunk#{} is ready to consume", this.nextChunkToConsume);
            ++this.nextChunkToConsume;
            snowflakeResultChunk = currentChunk;
            logger.debug("Consumer free lock", false);
            terminateDownloader = currentChunk.getDownloadState() == SnowflakeResultChunk.DownloadState.FAILURE;
        }
        catch (Throwable throwable) {
            logger.debug("Consumer free lock", false);
            boolean terminateDownloader2 = currentChunk.getDownloadState() == SnowflakeResultChunk.DownloadState.FAILURE;
            currentChunk.getLock().unlock();
            if (this.nextChunkToConsume == this.chunks.size()) {
                this.releaseCurrentMemoryUsage(this.nextChunkToConsume - 1, Optional.empty());
            }
            if (terminateDownloader2) {
                logger.debug("Download result fail. Shut down the chunk downloader", false);
                this.terminate();
            }
            throw throwable;
        }
        currentChunk.getLock().unlock();
        if (this.nextChunkToConsume == this.chunks.size()) {
            this.releaseCurrentMemoryUsage(this.nextChunkToConsume - 1, Optional.empty());
        }
        if (terminateDownloader) {
            logger.debug("Download result fail. Shut down the chunk downloader", false);
            this.terminate();
        }
        return snowflakeResultChunk;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForChunkReady(SnowflakeResultChunk currentChunk) throws InterruptedException {
        int retry = 0;
        long startTime = System.currentTimeMillis();
        do {
            logger.debug("Thread {} is waiting for chunk#{} to be ready, current chunk state is: {}, retry: {}", new Object[]{Thread.currentThread().getId(), this.nextChunkToConsume, currentChunk.getDownloadState(), retry});
            if (currentChunk.getDownloadState() != SnowflakeResultChunk.DownloadState.FAILURE && currentChunk.getDownloadState() != SnowflakeResultChunk.DownloadState.SUCCESS && !currentChunk.getDownloadCondition().await(downloadedConditionTimeoutInSeconds, TimeUnit.SECONDS)) {
                logger.debug("Thread {} is timeout for waiting chunk#{} to be ready, current chunk state is: {}, retry: {}, scrubbedUrl: {}", new Object[]{Thread.currentThread().getId(), this.nextChunkToConsume, currentChunk.getDownloadState(), retry, currentChunk.getScrubbedUrl()});
                currentChunk.setDownloadState(SnowflakeResultChunk.DownloadState.FAILURE);
                currentChunk.setDownloadError(String.format("Timeout waiting for the download of chunk#%d(Total chunks: %d) retry: %d scrubbedUrl: %s", this.nextChunkToConsume, this.chunks.size(), retry, currentChunk.getScrubbedUrl()));
                break;
            }
            if (currentChunk.getDownloadState() == SnowflakeResultChunk.DownloadState.SUCCESS) continue;
            logger.debug("Since downloadState is {} Thread {} decides to retry {} time(s) for chunk#{}", new Object[]{currentChunk.getDownloadState(), Thread.currentThread().getId(), ++retry, this.nextChunkToConsume});
            Future<Void> downloaderFuture = this.downloaderFutures.get(this.nextChunkToConsume);
            if (downloaderFuture != null) {
                downloaderFuture.cancel(true);
            }
            HttpUtil.closeExpiredAndIdleConnections();
            this.chunks.get(this.nextChunkToConsume).getLock().lock();
            try {
                this.chunks.get(this.nextChunkToConsume).setDownloadState(SnowflakeResultChunk.DownloadState.IN_PROGRESS);
                this.chunks.get(this.nextChunkToConsume).reset();
            }
            finally {
                this.chunks.get(this.nextChunkToConsume).getLock().unlock();
            }
            Thread.sleep(new Random().nextInt(1000));
            downloaderFuture = this.executor.submit(SnowflakeChunkDownloader.getDownloadChunkCallable(this, this.chunks.get(this.nextChunkToConsume), this.qrmk, this.nextChunkToConsume, this.chunkHeadersMap, this.networkTimeoutInMilli, this.authTimeout, this.socketTimeout, this.maxHttpRetries, this.session, this.chunks.size(), this.queryId));
            this.downloaderFutures.put(this.nextChunkToConsume, downloaderFuture);
            if (this.nextChunkToDownload != this.nextChunkToConsume) continue;
            this.nextChunkToDownload = this.nextChunkToConsume + 1;
        } while (currentChunk.getDownloadState() != SnowflakeResultChunk.DownloadState.SUCCESS && (this.maxHttpRetries <= 0 || retry < this.maxHttpRetries));
        if (currentChunk.getDownloadState() == SnowflakeResultChunk.DownloadState.SUCCESS) {
            logger.debug("Ready to consume chunk#{}, succeed retry={}", this.nextChunkToConsume, retry);
        } else if (retry >= this.maxHttpRetries) {
            currentChunk.setDownloadState(SnowflakeResultChunk.DownloadState.FAILURE);
            currentChunk.setDownloadError(String.format("Max retry reached for the download of chunk#%d (Total chunks: %d) retry: %d, error: %s", this.nextChunkToConsume, this.chunks.size(), retry, this.chunks.get(this.nextChunkToConsume).getDownloadError()));
        }
        this.numberMillisWaitingForChunks += System.currentTimeMillis() - startTime;
    }

    private void logOutOfMemoryError() {
        logger.error("Dump some crucial information below:\nTotal milliseconds waiting for chunks: {},\nTotal memory used: {}, Max heap size: {}, total download time: {} millisec,\ntotal parsing time: {} milliseconds, total chunks: {},\ncurrentMemoryUsage in Byte: {}, currentMemoryLimit in Bytes: {} \nnextChunkToDownload: {}, nextChunkToConsume: {}\nSeveral suggestions to try to resolve the OOM issue:\n1. increase the JVM heap size if you have more space; or \n2. use CLIENT_MEMORY_LIMIT to reduce the memory usage by the JDBC driver (https://docs.snowflake.net/manuals/sql-reference/parameters.html#client-memory-limit)3. please make sure 2 * CLIENT_PREFETCH_THREADS * CLIENT_RESULT_CHUNK_SIZE < CLIENT_MEMORY_LIMIT. If not, please reduce CLIENT_PREFETCH_THREADS and CLIENT_RESULT_CHUNK_SIZE too.", this.numberMillisWaitingForChunks, Runtime.getRuntime().totalMemory(), Runtime.getRuntime().maxMemory(), this.totalMillisDownloadingChunks.get(), this.totalMillisParsingChunks.get(), this.chunks.size(), currentMemoryUsage, this.memoryLimit, this.nextChunkToDownload, this.nextChunkToConsume);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DownloaderMetrics terminate() throws InterruptedException {
        if (!this.terminated.getAndSet(true)) {
            try {
                if (this.executor != null) {
                    if (!this.executor.isShutdown()) {
                        this.downloaderFutures.forEach((k, v) -> v.cancel(true));
                        this.executor.shutdown();
                        if (!this.executor.awaitTermination(3L, TimeUnit.SECONDS)) {
                            logger.debug("Executor did not terminate in the specified time.", false);
                            List<Runnable> droppedTasks = this.executor.shutdownNow();
                            logger.debug("Executor was abruptly shut down. {} tasks will not be executed.", droppedTasks.size());
                        }
                    }
                    if (injectedDownloaderException != null && injectedDownloaderException instanceof InterruptedException) {
                        throw (InterruptedException)injectedDownloaderException;
                    }
                }
                long totalUncompressedSize = this.chunks.stream().reduce(0L, (acc, chunk) -> acc + (long)chunk.getUncompressedSize(), Long::sum);
                long rowsInChunks = this.chunks.stream().reduce(0L, (acc, chunk) -> acc + (long)chunk.getRowCount(), Long::sum);
                long chunksSize = this.chunks.size();
                logger.debug("Completed processing {} {} chunks for query {} in {} ms. Download took {} ms (average: {} ms), parsing took {} ms (average: {} ms). Chunks uncompressed size: {} MB (average: {} MB), rows in chunks: {} (total: {}, average in chunk: {}), total memory used: {} MB", chunksSize, this.queryResultFormat == QueryResultFormat.ARROW ? "ARROW" : "JSON", this.queryId, this.totalMillisParsingChunks.get() + this.totalMillisDownloadingChunks.get(), this.totalMillisDownloadingChunks.get(), this.totalMillisDownloadingChunks.get() / chunksSize, this.totalMillisParsingChunks, this.totalMillisParsingChunks.get() / chunksSize, totalUncompressedSize / 0x100000L, totalUncompressedSize / 0x100000L / chunksSize, rowsInChunks, (long)this.firstChunkRowCount + rowsInChunks, rowsInChunks / chunksSize, Runtime.getRuntime().totalMemory() / 0x100000L);
                DownloaderMetrics downloaderMetrics = new DownloaderMetrics(this.numberMillisWaitingForChunks, this.totalMillisDownloadingChunks.get(), this.totalMillisParsingChunks.get());
                return downloaderMetrics;
            }
            finally {
                for (SnowflakeResultChunk chunk2 : this.chunks) {
                    chunk2.freeData();
                }
                if (this.queryResultFormat == QueryResultFormat.ARROW) {
                    SFArrowResultSet.closeRootAllocator(this.rootAllocator);
                } else {
                    this.chunkDataCache.clear();
                }
                this.releaseAllChunkMemoryUsage();
                this.chunks = null;
            }
        }
        return null;
    }

    private void addDownloadTime(long downloadTime) {
        this.totalMillisDownloadingChunks.addAndGet(downloadTime);
    }

    private void addParsingTime(long parsingTime) {
        this.totalMillisParsingChunks.addAndGet(parsingTime);
    }

    private static Callable<Void> getDownloadChunkCallable(final SnowflakeChunkDownloader downloader, final SnowflakeResultChunk resultChunk, String qrmk, final int chunkIndex, Map<String, String> chunkHeadersMap, int networkTimeoutInMilli, int authTimeout, int socketTimeout, int maxHttpRetries, final SFBaseSession session, final int totalChunks, final String queryId) {
        final ChunkDownloadContext downloadContext = new ChunkDownloadContext(downloader, resultChunk, qrmk, chunkIndex, chunkHeadersMap, networkTimeoutInMilli, authTimeout, socketTimeout, maxHttpRetries, session);
        return new Callable<Void>(){
            private long startTime;

            /*
             * Enabled aggressive block sorting
             * Enabled unnecessary exception pruning
             * Enabled aggressive exception aggregation
             */
            private void downloadAndParseChunk(InputStream inputStream) throws SnowflakeSQLException {
                resultChunk.setDownloadTime(System.currentTimeMillis() - this.startTime);
                downloader.addDownloadTime(resultChunk.getDownloadTime());
                this.startTime = System.currentTimeMillis();
                try {
                    if (downloader.queryResultFormat == QueryResultFormat.ARROW) {
                        ((ArrowResultChunk)resultChunk).readArrowStream(inputStream);
                    } else {
                        this.parseJsonToChunkV2(inputStream, resultChunk);
                    }
                }
                catch (Exception ex) {
                    try {
                        logger.debug("Thread {} Exception when parsing result chunk#{}: {}", Thread.currentThread().getId(), chunkIndex, ex.getLocalizedMessage());
                        throw new SnowflakeSQLLoggedException(session, "XX000", (int)ErrorCode.INTERNAL_ERROR.getMessageCode(), ex, "Exception: " + ex.getLocalizedMessage());
                    }
                    catch (Throwable throwable) {
                        logger.debug("Thread {} close input stream for chunk#{}", Thread.currentThread().getId(), chunkIndex);
                        try {
                            inputStream.close();
                            throw throwable;
                        }
                        catch (IOException ex2) {
                            throw new SnowflakeSQLLoggedException(session, "XX000", (int)ErrorCode.INTERNAL_ERROR.getMessageCode(), ex2, "Exception: " + ex2.getLocalizedMessage());
                        }
                    }
                }
                logger.debug("Thread {} close input stream for chunk#{}", Thread.currentThread().getId(), chunkIndex);
                try {
                    inputStream.close();
                }
                catch (IOException ex) {
                    throw new SnowflakeSQLLoggedException(session, "XX000", (int)ErrorCode.INTERNAL_ERROR.getMessageCode(), ex, "Exception: " + ex.getLocalizedMessage());
                }
                resultChunk.setParseTime(System.currentTimeMillis() - this.startTime);
                downloader.addParsingTime(resultChunk.getParseTime());
            }

            @Override
            public Void call() {
                resultChunk.getLock().lock();
                try {
                    resultChunk.setDownloadState(SnowflakeResultChunk.DownloadState.IN_PROGRESS);
                }
                finally {
                    resultChunk.getLock().unlock();
                }
                logger.debug("Downloading chunk#{}, url: {}, Thread {}", chunkIndex, resultChunk.getUrl(), Thread.currentThread().getId());
                this.startTime = System.currentTimeMillis();
                TelemetryService.getInstance().updateContext(downloader.snowflakeConnectionString);
                try {
                    if (injectedDownloaderException != null) {
                        throw injectedDownloaderException;
                    }
                    InputStream is = downloader.getResultStreamProvider().getInputStream(downloadContext);
                    logger.debug("Thread {} start downloading chunk#{}", Thread.currentThread().getId(), chunkIndex);
                    this.downloadAndParseChunk(is);
                    logger.debug("Thread {} finish downloading chunk#{}", Thread.currentThread().getId(), chunkIndex);
                    downloader.downloaderFutures.remove(chunkIndex);
                    if (chunkIndex % 5 == 0) {
                        logger.debug("Processed {} chunk#{} in {} ms ({} out of {}) for query {}. Download took {} ms, parsing took {} ms. Chunk uncompressed size: {} kB, cols: {}, rows: {}, scrubbed URL: {}", downloader.queryResultFormat == QueryResultFormat.ARROW ? "ARROW" : "JSON", chunkIndex, resultChunk.getTotalTime(), chunkIndex + 1, totalChunks, queryId, resultChunk.getDownloadTime(), resultChunk.getParseTime(), resultChunk.getUncompressedSize() / 1024, resultChunk.colCount, resultChunk.rowCount, resultChunk.getScrubbedUrl());
                    } else {
                        logger.trace("Processed {} chunk#{} in {} ms ({} out of {}) for query {}. Download took {} ms, parsing took {} ms. Chunk uncompressed size: {} kB, cols: {}, rows: {}, scrubbed URL: {}", downloader.queryResultFormat == QueryResultFormat.ARROW ? "ARROW" : "JSON", chunkIndex, resultChunk.getTotalTime(), chunkIndex + 1, totalChunks, queryId, resultChunk.getDownloadTime(), resultChunk.getParseTime(), resultChunk.getUncompressedSize() / 1024, resultChunk.colCount, resultChunk.rowCount, resultChunk.getScrubbedUrl());
                    }
                    resultChunk.getLock().lock();
                    try {
                        logger.debug("Get lock to change the chunk to be ready to consume", false);
                        logger.debug("Wake up consumer if it is waiting for a chunk to be ready", false);
                        resultChunk.setDownloadState(SnowflakeResultChunk.DownloadState.SUCCESS);
                        resultChunk.getDownloadCondition().signal();
                    }
                    catch (Throwable throwable) {
                        logger.debug("Downloaded chunk#{}, free lock", chunkIndex);
                        resultChunk.getLock().unlock();
                        throw throwable;
                    }
                    logger.debug("Downloaded chunk#{}, free lock", chunkIndex);
                    resultChunk.getLock().unlock();
                }
                catch (Throwable th) {
                    resultChunk.getLock().lock();
                    try {
                        logger.debug("Get lock to set chunk download error", false);
                        resultChunk.setDownloadState(SnowflakeResultChunk.DownloadState.FAILURE);
                        downloader.releaseCurrentMemoryUsage(chunkIndex, Optional.empty());
                        StringWriter errors = new StringWriter();
                        th.printStackTrace(new PrintWriter(errors));
                        resultChunk.setDownloadError(errors.toString());
                        logger.debug("Wake up consumer if it is waiting for a chunk to be ready", false);
                        resultChunk.getDownloadCondition().signal();
                    }
                    catch (Throwable throwable) {
                        logger.debug("Failed to download chunk#{}, free lock", chunkIndex);
                        resultChunk.getLock().unlock();
                        throw throwable;
                    }
                    logger.debug("Failed to download chunk#{}, free lock", chunkIndex);
                    resultChunk.getLock().unlock();
                    logger.debug("Thread {} Exception encountered ({}:{}) fetching chunk#{} from: {}, Error {}", Thread.currentThread().getId(), th.getClass().getName(), th.getLocalizedMessage(), chunkIndex, resultChunk.getScrubbedUrl(), resultChunk.getDownloadError());
                }
                return null;
            }

            private void parseJsonToChunkV2(InputStream jsonInputStream, SnowflakeResultChunk resultChunk2) throws IOException, SnowflakeSQLException {
                int len;
                ResultJsonParserV2 jp = new ResultJsonParserV2();
                jp.startParsing((JsonResultChunk)resultChunk2, session);
                byte[] buf = new byte[0x100000];
                byte[] prevBuffer = null;
                ByteBuffer bBuf = null;
                logger.debug("Thread {} start to read inputstream for chunk#{}", Thread.currentThread().getId(), chunkIndex);
                while ((len = jsonInputStream.read(buf)) != -1) {
                    if (prevBuffer != null) {
                        ByteArrayOutputStream os = new ByteArrayOutputStream();
                        os.write(prevBuffer);
                        os.write(buf);
                        buf = os.toByteArray();
                        len += prevBuffer.length;
                    }
                    bBuf = ByteBuffer.wrap(buf, 0, len);
                    jp.continueParsing(bBuf, session);
                    if (bBuf.remaining() > 0) {
                        prevBuffer = new byte[bBuf.remaining()];
                        bBuf.get(prevBuffer);
                        continue;
                    }
                    prevBuffer = null;
                }
                logger.debug("Thread {} finish reading inputstream for chunk#{}", Thread.currentThread().getId(), chunkIndex);
                bBuf = prevBuffer != null ? ByteBuffer.wrap(prevBuffer) : ByteBuffer.wrap(new byte[0]);
                jp.endParsing(bBuf, session);
            }
        };
    }

    public static class NoOpChunkDownloader
    implements ChunkDownloader {
        @Override
        public SnowflakeResultChunk getNextChunkToConsume() throws SnowflakeSQLException {
            return null;
        }

        @Override
        public DownloaderMetrics terminate() {
            return null;
        }
    }
}

