/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.ingest.streaming.internal;

import com.sun.management.OperatingSystemMXBean;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import net.snowflake.client.jdbc.internal.google.common.util.concurrent.ThreadFactoryBuilder;
import net.snowflake.ingest.internal.com.codahale.metrics.Timer;
import net.snowflake.ingest.internal.com.google.common.annotations.VisibleForTesting;
import net.snowflake.ingest.streaming.internal.BlobBuilder;
import net.snowflake.ingest.streaming.internal.BlobMetadata;
import net.snowflake.ingest.streaming.internal.BlobStats;
import net.snowflake.ingest.streaming.internal.ChannelCache;
import net.snowflake.ingest.streaming.internal.ChannelData;
import net.snowflake.ingest.streaming.internal.ChunkMetadata;
import net.snowflake.ingest.streaming.internal.IStorageManager;
import net.snowflake.ingest.streaming.internal.RegisterService;
import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal;
import net.snowflake.ingest.streaming.internal.StreamingIngestStorage;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;

class FlushService<T> {
    private static final int DEFAULT_MAX_UPLOAD_RETRIES = 5;
    private static final Logging logger = new Logging(FlushService.class);
    private final SnowflakeStreamingIngestClientInternal<T> owningClient;
    @VisibleForTesting
    ScheduledExecutorService flushWorker;
    @VisibleForTesting
    ExecutorService registerWorker;
    @VisibleForTesting
    ExecutorService buildUploadWorkers;
    private final ChannelCache<T> channelCache;
    private final IStorageManager<T, ?> storageManager;
    private final RegisterService<T> registerService;
    @VisibleForTesting
    volatile long lastFlushTime;
    @VisibleForTesting
    volatile boolean isNeedFlush;
    private final boolean isTestMode;
    private final Map<String, Timer.Context> latencyTimerContextMap;
    private final Constants.BdecVersion bdecVersion;
    private volatile int numProcessors = Runtime.getRuntime().availableProcessors();

    FlushService(SnowflakeStreamingIngestClientInternal<T> client, ChannelCache<T> cache, IStorageManager<T, ?> storageManager, boolean isTestMode) {
        this.owningClient = client;
        this.channelCache = cache;
        this.storageManager = storageManager;
        this.registerService = new RegisterService<T>(client, isTestMode);
        this.isNeedFlush = false;
        this.lastFlushTime = System.currentTimeMillis();
        this.isTestMode = isTestMode;
        this.latencyTimerContextMap = new ConcurrentHashMap<String, Timer.Context>();
        this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion();
        this.createWorkers();
    }

    private CompletableFuture<Void> statsFuture() {
        return CompletableFuture.runAsync(() -> {
            if (this.owningClient.cpuHistogram != null) {
                double cpuLoad = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class).getProcessCpuLoad();
                this.owningClient.cpuHistogram.update((long)(cpuLoad * 100.0));
            }
        }, this.flushWorker);
    }

    private CompletableFuture<Void> distributeFlush(boolean isForce, Set<String> tablesToFlush, Long flushStartTime) {
        return CompletableFuture.runAsync(() -> {
            long prevFlushEndTime;
            this.logFlushTask(isForce, tablesToFlush, flushStartTime);
            this.distributeFlushTasks(tablesToFlush);
            this.lastFlushTime = prevFlushEndTime = System.currentTimeMillis();
            this.isNeedFlush = false;
            tablesToFlush.forEach(table -> {
                this.channelCache.setLastFlushTime((String)table, prevFlushEndTime);
                this.channelCache.setNeedFlush((String)table, false);
            });
        }, this.flushWorker);
    }

    private void logFlushTask(boolean isForce, Set<String> tablesToFlush, long flushStartTime) {
        boolean isNeedFlush = this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1 ? tablesToFlush.stream().anyMatch(this.channelCache::getNeedFlush) : this.isNeedFlush;
        long currentTime = System.currentTimeMillis();
        String logInfo = this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1 ? String.format("Tables=[%s]", tablesToFlush.stream().map(table -> String.format("(name=%s, isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s)", table, this.channelCache.getNeedFlush((String)table), flushStartTime - this.channelCache.getLastFlushTime((String)table), currentTime - this.channelCache.getLastFlushTime((String)table))).collect(Collectors.joining(", "))) : String.format("isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s", isNeedFlush, flushStartTime - this.lastFlushTime, currentTime - this.lastFlushTime);
        String flushTaskLogFormat = String.format("Submit forced or ad-hoc flush task on client=%s, isForce=%s, %s", this.owningClient.getName(), isForce, logInfo);
        if (logger.isTraceEnabled()) {
            logger.logTrace(flushTaskLogFormat);
        }
        if (!logger.isTraceEnabled() && (isNeedFlush || isForce)) {
            logger.logDebug(flushTaskLogFormat);
        }
    }

    private CompletableFuture<Void> registerFuture() {
        return CompletableFuture.runAsync(() -> this.registerService.registerBlobs(this.latencyTimerContextMap), this.registerWorker);
    }

    CompletableFuture<Void> flush(boolean isForce) {
        long flushStartTime = System.currentTimeMillis();
        long flushingInterval = this.owningClient.getParameterProvider().getCachedMaxClientLagInMs();
        Set<String> tablesToFlush = this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1 ? this.channelCache.keySet().stream().filter(key -> isForce || flushStartTime - this.channelCache.getLastFlushTime((String)key) >= flushingInterval || this.channelCache.getNeedFlush((String)key)).collect(Collectors.toSet()) : (isForce || !this.isTestMode() && (this.isNeedFlush || flushStartTime - this.lastFlushTime >= flushingInterval) ? this.channelCache.keySet() : null);
        if (isForce || !this.isTestMode() && tablesToFlush != null && !tablesToFlush.isEmpty()) {
            return ((CompletableFuture)this.statsFuture().thenCompose(v -> this.distributeFlush(isForce, tablesToFlush, flushStartTime))).thenCompose(v -> this.registerFuture());
        }
        return this.statsFuture();
    }

    private void createWorkers() {
        ThreadFactory flushThreadFactory = new ThreadFactoryBuilder().setNameFormat("ingest-flush-thread").build();
        this.flushWorker = Executors.newSingleThreadScheduledExecutor(flushThreadFactory);
        this.flushWorker.scheduleWithFixedDelay(() -> {
            try {
                this.flush(false).exceptionally(e -> {
                    String errorMessage = String.format("Background flush task failed, client=%s, exception=%s, detail=%s, trace=%s.", this.owningClient.getName(), e.getCause(), e.getCause().getMessage(), Utils.getStackTrace(e.getCause()));
                    logger.logError(errorMessage);
                    if (this.owningClient.getTelemetryService() != null) {
                        this.owningClient.getTelemetryService().reportClientFailure(this.getClass().getSimpleName(), errorMessage);
                    }
                    return null;
                });
            }
            catch (Exception e2) {
                String errorMessage = String.format("Failed to schedule a flush task, client=%s, exception=%s, detail=%s, trace=%s.", this.owningClient.getName(), e2.getClass().getName(), e2.getMessage(), Utils.getStackTrace(e2));
                logger.logError(errorMessage);
                if (this.owningClient.getTelemetryService() != null) {
                    this.owningClient.getTelemetryService().reportClientFailure(this.getClass().getSimpleName(), errorMessage);
                }
                throw e2;
            }
        }, 0L, this.owningClient.getParameterProvider().getBufferFlushCheckIntervalInMs(), TimeUnit.MILLISECONDS);
        ThreadFactory registerThreadFactory = new ThreadFactoryBuilder().setNameFormat("ingest-register-thread").build();
        this.registerWorker = Executors.newSingleThreadExecutor(registerThreadFactory);
        ThreadFactory buildUploadThreadFactory = new ThreadFactoryBuilder().setNameFormat("ingest-build-upload-thread-%d").build();
        int buildUploadThreadCount = Math.min(Runtime.getRuntime().availableProcessors() * (1 + this.owningClient.getParameterProvider().getIOTimeCpuRatio()), Integer.MAX_VALUE);
        this.buildUploadWorkers = Executors.newFixedThreadPool(buildUploadThreadCount, buildUploadThreadFactory);
        logger.logInfo("Create {} threads for build/upload blobs for client={}, total available processors={}", buildUploadThreadCount, this.owningClient.getName(), Runtime.getRuntime().availableProcessors());
    }

    void distributeFlushTasks(Set<String> tablesToFlush) {
        Iterator itr = this.channelCache.entrySet().stream().filter(e -> tablesToFlush.contains(e.getKey())).iterator();
        ArrayList blobs = new ArrayList();
        ArrayList leftoverChannelsDataPerTable = new ArrayList();
        this.numProcessors = Runtime.getRuntime().availableProcessors();
        while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) {
            ArrayList blobData = new ArrayList();
            float totalBufferSizeInBytes = 0.0f;
            String blobPath = this.storageManager.generateBlobPath();
            while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) {
                int idx;
                List channelsDataPerTable = Collections.synchronizedList(new ArrayList());
                if (!leftoverChannelsDataPerTable.isEmpty()) {
                    channelsDataPerTable.addAll(leftoverChannelsDataPerTable);
                    leftoverChannelsDataPerTable.clear();
                } else {
                    if (blobData.size() >= this.owningClient.getParameterProvider().getMaxChunksInBlob()) {
                        logger.logInfo("Max allowed number of chunks in the current blob reached. chunkCount={} maxChunkCount={} currentBlobPath={}", blobData.size(), this.owningClient.getParameterProvider().getMaxChunksInBlob(), blobPath);
                        break;
                    }
                    ConcurrentHashMap table = (ConcurrentHashMap)((Map.Entry)itr.next()).getValue();
                    table.values().parallelStream().forEach(channel -> {
                        ChannelData data;
                        if (channel.isValid() && (data = channel.getData(blobPath)) != null) {
                            channelsDataPerTable.add(data);
                        }
                    });
                }
                if (channelsDataPerTable.isEmpty()) continue;
                float totalBufferSizePerTableInBytes = 0.0f;
                for (idx = 0; idx < channelsDataPerTable.size(); ++idx) {
                    ChannelData channelData = (ChannelData)channelsDataPerTable.get(idx);
                    if (idx > 0 && this.shouldStopProcessing(totalBufferSizeInBytes, totalBufferSizePerTableInBytes, channelData, (ChannelData)channelsDataPerTable.get(idx - 1))) {
                        leftoverChannelsDataPerTable.addAll(channelsDataPerTable.subList(idx, channelsDataPerTable.size()));
                        logger.logInfo("Creation of another blob is needed because of blob/chunk size limit or different encryption ids or different schema, client={}, table={}, blobSize={}, chunkSize={}, nextChannelSize={}, encryptionId1={}, encryptionId2={}, schema1={}, schema2={}", this.owningClient.getName(), channelData.getChannelContext().getTableName(), Float.valueOf(totalBufferSizeInBytes), Float.valueOf(totalBufferSizePerTableInBytes), Float.valueOf(channelData.getBufferSize()), channelData.getChannelContext().getEncryptionKeyId(), ((ChannelData)channelsDataPerTable.get(idx - 1)).getChannelContext().getEncryptionKeyId(), channelData.getColumnEps().keySet(), ((ChannelData)channelsDataPerTable.get(idx - 1)).getColumnEps().keySet());
                        break;
                    }
                    totalBufferSizeInBytes += channelData.getBufferSize();
                    totalBufferSizePerTableInBytes += channelData.getBufferSize();
                }
                blobData.add(channelsDataPerTable.subList(0, idx));
                if (idx == channelsDataPerTable.size()) continue;
                break;
            }
            if (blobData.isEmpty()) {
                this.storageManager.decrementBlobSequencer();
                continue;
            }
            long flushStartMs = System.currentTimeMillis();
            if (this.owningClient.flushLatency != null) {
                this.latencyTimerContextMap.putIfAbsent(blobPath, this.owningClient.flushLatency.time());
            }
            blobs.add(new Pair(new BlobData(blobPath, blobData), CompletableFuture.supplyAsync(() -> {
                try {
                    String fullyQualifiedTableName = ((ChannelData)((List)blobData.get(0)).get(0)).getChannelContext().getFullyQualifiedTableName();
                    BlobMetadata blobMetadata = this.buildAndUpload(blobPath, blobData, fullyQualifiedTableName);
                    blobMetadata.getBlobStats().setFlushStartMs(flushStartMs);
                    return blobMetadata;
                }
                catch (Throwable e) {
                    Throwable ex = e.getCause() == null ? e : e.getCause();
                    String errorMessage = String.format("Building blob failed, client=%s, blob=%s, exception=%s, detail=%s, trace=%s, all channels in the blob will be invalidated", this.owningClient.getName(), blobPath, ex, ex.getMessage(), Utils.getStackTrace(ex));
                    logger.logError(errorMessage);
                    if (this.owningClient.getTelemetryService() != null) {
                        this.owningClient.getTelemetryService().reportClientFailure(this.getClass().getSimpleName(), errorMessage);
                    }
                    if (e instanceof IOException) {
                        this.invalidateAllChannelsInBlob(blobData, errorMessage);
                        return null;
                    }
                    if (e instanceof NoSuchAlgorithmException) {
                        throw new SFException(e, ErrorCode.MD5_HASHING_NOT_AVAILABLE, new Object[0]);
                    }
                    if (e instanceof InvalidAlgorithmParameterException | e instanceof NoSuchPaddingException | e instanceof IllegalBlockSizeException | e instanceof BadPaddingException | e instanceof InvalidKeyException) {
                        throw new SFException(e, ErrorCode.ENCRYPTION_FAILURE, new Object[0]);
                    }
                    throw new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage());
                }
            }, this.buildUploadWorkers)));
            logger.logInfo("buildAndUpload task added for client={}, blob={}, buildUploadWorkers stats={}", this.owningClient.getName(), blobPath, this.buildUploadWorkers.toString());
        }
        this.registerService.addBlobs(blobs);
    }

    private boolean shouldStopProcessing(float totalBufferSizeInBytes, float totalBufferSizePerTableInBytes, ChannelData<T> current, ChannelData<T> prev) {
        return totalBufferSizeInBytes + current.getBufferSize() > 1.0737418E9f || totalBufferSizePerTableInBytes + current.getBufferSize() > (float)this.owningClient.getParameterProvider().getMaxChunkSizeInBytes() || !Objects.equals(current.getChannelContext().getEncryptionKeyId(), prev.getChannelContext().getEncryptionKeyId()) || !current.getColumnEps().keySet().equals(prev.getColumnEps().keySet());
    }

    BlobMetadata buildAndUpload(String blobPath, List<List<ChannelData<T>>> blobData, String fullyQualifiedTableName) throws IOException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, NoSuchPaddingException, IllegalBlockSizeException, BadPaddingException, InvalidKeyException {
        Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency);
        BlobBuilder.Blob blob = BlobBuilder.constructBlobAndMetadata(blobPath, blobData, this.bdecVersion, this.owningClient.getInternalParameterProvider().getEnableChunkEncryption());
        blob.blobStats.setBuildDurationMs(buildContext);
        return this.upload(this.storageManager.getStorage(fullyQualifiedTableName), blobPath, blob.blobBytes, blob.chunksMetadataList, blob.blobStats);
    }

    BlobMetadata upload(StreamingIngestStorage<T, ?> storage, String blobPath, byte[] blob, List<ChunkMetadata> metadata, BlobStats blobStats) throws NoSuchAlgorithmException {
        logger.logInfo("Start uploading blob={}, size={}", blobPath, blob.length);
        long startTime = System.currentTimeMillis();
        Timer.Context uploadContext = Utils.createTimerContext(this.owningClient.uploadLatency);
        storage.put(blobPath, blob);
        if (uploadContext != null) {
            blobStats.setUploadDurationMs(uploadContext);
            this.owningClient.uploadThroughput.mark(blob.length);
            this.owningClient.blobSizeHistogram.update(blob.length);
            this.owningClient.blobRowCountHistogram.update(metadata.stream().mapToLong(i -> i.getEpInfo().getRowCount()).sum());
        }
        logger.logInfo("Finish uploading blob={}, size={}, timeInMillis={}", blobPath, blob.length, System.currentTimeMillis() - startTime);
        return BlobMetadata.createBlobMetadata(blobPath, BlobBuilder.computeMD5(blob), this.bdecVersion, metadata, blobStats, metadata == null ? false : metadata.size() > 1);
    }

    void shutdown() throws InterruptedException {
        boolean isTerminated;
        this.flushWorker.shutdown();
        this.registerWorker.shutdown();
        this.buildUploadWorkers.shutdown();
        boolean bl = isTerminated = this.flushWorker.awaitTermination(300L, TimeUnit.SECONDS) && this.registerWorker.awaitTermination(300L, TimeUnit.SECONDS) && this.buildUploadWorkers.awaitTermination(300L, TimeUnit.SECONDS);
        if (!isTerminated) {
            logger.logWarn("Tasks can't be terminated within the timeout, force shutdown now.");
            this.flushWorker.shutdownNow();
            this.registerWorker.shutdownNow();
            this.buildUploadWorkers.shutdownNow();
        }
    }

    void setNeedFlush(String fullyQualifiedTableName) {
        this.isNeedFlush = true;
        if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) {
            this.channelCache.setNeedFlush(fullyQualifiedTableName, true);
        }
    }

    <CD> void invalidateAllChannelsInBlob(List<List<ChannelData<CD>>> blobData, String invalidationCause) {
        blobData.forEach(chunkData -> chunkData.forEach(channelData -> this.owningClient.getChannelCache().invalidateChannelIfSequencersMatch(channelData.getChannelContext().getDbName(), channelData.getChannelContext().getSchemaName(), channelData.getChannelContext().getTableName(), channelData.getChannelContext().getName(), channelData.getChannelContext().getChannelSequencer(), invalidationCause)));
    }

    boolean throttleDueToQueuedFlushTasks() {
        boolean throttleOnQueuedTasks;
        ThreadPoolExecutor buildAndUpload = (ThreadPoolExecutor)this.buildUploadWorkers;
        boolean bl = throttleOnQueuedTasks = buildAndUpload.getQueue().size() > this.numProcessors;
        if (throttleOnQueuedTasks) {
            logger.logWarn("Throttled due too many queue flush tasks (probably because of slow uploading speed), client={}, buildUploadWorkers stats={}", this.owningClient.getName(), this.buildUploadWorkers.toString());
        }
        return throttleOnQueuedTasks;
    }

    boolean isTestMode() {
        return this.isTestMode;
    }

    static class BlobData<T> {
        private final String path;
        private final List<List<ChannelData<T>>> data;

        BlobData(String path, List<List<ChannelData<T>>> data) {
            this.path = path;
            this.data = data;
        }

        String getPath() {
            return this.path;
        }

        List<List<ChannelData<T>>> getData() {
            return this.data;
        }
    }
}

