/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.ingest.streaming.internal;

import com.codahale.metrics.Timer;
import com.sun.management.OperatingSystemMXBean;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.client.jdbc.internal.google.common.util.concurrent.ThreadFactoryBuilder;
import net.snowflake.ingest.streaming.internal.BlobBuilder;
import net.snowflake.ingest.streaming.internal.BlobMetadata;
import net.snowflake.ingest.streaming.internal.ChannelCache;
import net.snowflake.ingest.streaming.internal.ChannelData;
import net.snowflake.ingest.streaming.internal.ChunkMetadata;
import net.snowflake.ingest.streaming.internal.RegisterService;
import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal;
import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal;
import net.snowflake.ingest.streaming.internal.StreamingIngestStage;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.arrow.vector.VectorSchemaRoot;

class FlushService<T> {
    private static final Logging logger = new Logging(FlushService.class);
    private final AtomicLong counter;
    private final SnowflakeStreamingIngestClientInternal<T> owningClient;
    @VisibleForTesting
    ScheduledExecutorService flushWorker;
    @VisibleForTesting
    ExecutorService registerWorker;
    @VisibleForTesting
    ExecutorService buildUploadWorkers;
    private final ChannelCache<T> channelCache;
    private final StreamingIngestStage targetStage;
    private final RegisterService<T> registerService;
    @VisibleForTesting
    volatile boolean isNeedFlush;
    @VisibleForTesting
    volatile long lastFlushTime;
    private final boolean isTestMode;
    private final Map<String, Timer.Context> latencyTimerContextMap;
    private final Constants.BdecVersion bdecVersion;

    FlushService(SnowflakeStreamingIngestClientInternal<T> client, ChannelCache<T> cache, StreamingIngestStage targetStage, boolean isTestMode) {
        this.owningClient = client;
        this.channelCache = cache;
        this.targetStage = targetStage;
        this.counter = new AtomicLong(0L);
        this.registerService = new RegisterService<T>(client, isTestMode);
        this.isNeedFlush = false;
        this.lastFlushTime = System.currentTimeMillis();
        this.isTestMode = isTestMode;
        this.latencyTimerContextMap = new ConcurrentHashMap<String, Timer.Context>();
        this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion();
        this.createWorkers();
    }

    FlushService(SnowflakeStreamingIngestClientInternal<T> client, ChannelCache<T> cache, boolean isTestMode) {
        this.owningClient = client;
        this.channelCache = cache;
        try {
            this.targetStage = new StreamingIngestStage(isTestMode, client.getRole(), client.getHttpClient(), client.getRequestBuilder(), client.getName());
        }
        catch (IOException | SnowflakeSQLException err) {
            throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, new Object[0]);
        }
        this.registerService = new RegisterService<T>(client, isTestMode);
        this.counter = new AtomicLong(0L);
        this.isNeedFlush = false;
        this.lastFlushTime = System.currentTimeMillis();
        this.isTestMode = isTestMode;
        this.latencyTimerContextMap = new HashMap<String, Timer.Context>();
        this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion();
        this.createWorkers();
    }

    private CompletableFuture<Void> statsFuture() {
        return CompletableFuture.runAsync(() -> {
            if (this.owningClient.cpuHistogram != null) {
                double cpuLoad = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class).getProcessCpuLoad();
                this.owningClient.cpuHistogram.update((long)(cpuLoad * 100.0));
            }
        }, this.flushWorker);
    }

    private CompletableFuture<Void> distributeFlush(boolean isForce, long timeDiffMillis) {
        return CompletableFuture.runAsync(() -> {
            this.logFlushTask(isForce, timeDiffMillis);
            this.distributeFlushTasks();
            this.isNeedFlush = false;
            this.lastFlushTime = System.currentTimeMillis();
        }, this.flushWorker);
    }

    private void logFlushTask(boolean isForce, long timeDiffMillis) {
        String flushTaskLogFormat = String.format("Submit forced or ad-hoc flush task on client=%s, isForce=%s, isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s", this.owningClient.getName(), isForce, this.isNeedFlush, timeDiffMillis, System.currentTimeMillis() - this.lastFlushTime);
        if (logger.isTraceEnabled()) {
            logger.logTrace(flushTaskLogFormat);
        }
        if (!logger.isTraceEnabled() && (this.isNeedFlush || isForce)) {
            logger.logDebug(flushTaskLogFormat);
        }
    }

    private CompletableFuture<Void> registerFuture() {
        return CompletableFuture.runAsync(() -> this.registerService.registerBlobs(this.latencyTimerContextMap), this.registerWorker);
    }

    CompletableFuture<Void> flush(boolean isForce) {
        long timeDiffMillis = System.currentTimeMillis() - this.lastFlushTime;
        if (isForce || !this.isTestMode && (this.isNeedFlush || timeDiffMillis >= this.owningClient.getParameterProvider().getBufferFlushIntervalInMs())) {
            return ((CompletableFuture)this.statsFuture().thenCompose(v -> this.distributeFlush(isForce, timeDiffMillis))).thenCompose(v -> this.registerFuture());
        }
        return this.statsFuture();
    }

    private void createWorkers() {
        ThreadFactory flushThreadFactory = new ThreadFactoryBuilder().setNameFormat("ingest-flush-thread").build();
        this.flushWorker = Executors.newSingleThreadScheduledExecutor(flushThreadFactory);
        this.flushWorker.scheduleWithFixedDelay(() -> this.flush(false).exceptionally(e -> {
            String errorMessage = String.format("Background flush task failed, client=%s, exception=%s, detail=%s, trace=%s.", this.owningClient.getName(), e.getCause(), e.getCause().getMessage(), Utils.getStackTrace(e.getCause()));
            logger.logError(errorMessage);
            if (this.owningClient.getTelemetryService() != null) {
                this.owningClient.getTelemetryService().reportClientFailure(this.getClass().getSimpleName(), errorMessage);
            }
            return null;
        }), 0L, this.owningClient.getParameterProvider().getBufferFlushCheckIntervalInMs(), TimeUnit.MILLISECONDS);
        ThreadFactory registerThreadFactory = new ThreadFactoryBuilder().setNameFormat("ingest-register-thread").build();
        this.registerWorker = Executors.newSingleThreadExecutor(registerThreadFactory);
        ThreadFactory buildUploadThreadFactory = new ThreadFactoryBuilder().setNameFormat("ingest-build-upload-thread-%d").build();
        int buildUploadThreadCount = Math.min(Runtime.getRuntime().availableProcessors() * (1 + this.owningClient.getParameterProvider().getIOTimeCpuRatio()), Integer.MAX_VALUE);
        this.buildUploadWorkers = Executors.newFixedThreadPool(buildUploadThreadCount, buildUploadThreadFactory);
        logger.logInfo("Create {} threads for build/upload blobs for client={}, total available processors={}", buildUploadThreadCount, this.owningClient.getName(), Runtime.getRuntime().availableProcessors());
    }

    void distributeFlushTasks() {
        Iterator<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>> itr = this.channelCache.iterator();
        ArrayList blobs = new ArrayList();
        while (itr.hasNext()) {
            ArrayList blobData = new ArrayList();
            AtomicReference<Float> totalBufferSizeInBytes = new AtomicReference<Float>(Float.valueOf(0.0f));
            String filePath = this.getFilePath(this.targetStage.getClientPrefix());
            while (itr.hasNext() && totalBufferSizeInBytes.get().floatValue() <= 2.56E8f) {
                ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>> table = itr.next().getValue();
                List channelsDataPerTable = Collections.synchronizedList(new ArrayList());
                table.values().parallelStream().forEach(channel -> {
                    ChannelData data;
                    if (channel.isValid() && (data = channel.getData(filePath)) != null) {
                        channelsDataPerTable.add(data);
                        totalBufferSizeInBytes.updateAndGet(v -> Float.valueOf(v.floatValue() + data.getBufferSize()));
                    }
                });
                if (channelsDataPerTable.isEmpty()) continue;
                blobData.add(channelsDataPerTable);
            }
            if (blobData.isEmpty()) {
                this.counter.decrementAndGet();
                continue;
            }
            if (this.owningClient.flushLatency != null) {
                this.latencyTimerContextMap.putIfAbsent(filePath, this.owningClient.flushLatency.time());
            }
            blobs.add(new Pair(new BlobData(filePath, blobData), CompletableFuture.supplyAsync(() -> {
                try {
                    return this.buildAndUpload(filePath, blobData);
                }
                catch (Throwable e) {
                    Throwable ex = e.getCause() == null ? e : e.getCause();
                    String errorMessage = String.format("Building blob failed, client=%s, file=%s, exception=%s, detail=%s, trace=%s, all channels in the blob will be invalidated", this.owningClient.getName(), filePath, ex, ex.getMessage(), Utils.getStackTrace(ex));
                    logger.logError(errorMessage);
                    if (this.owningClient.getTelemetryService() != null) {
                        this.owningClient.getTelemetryService().reportClientFailure(this.getClass().getSimpleName(), errorMessage);
                    }
                    if (e instanceof IOException) {
                        this.invalidateAllChannelsInBlob(blobData);
                        return null;
                    }
                    if (e instanceof NoSuchAlgorithmException) {
                        throw new SFException(e, ErrorCode.MD5_HASHING_NOT_AVAILABLE, new Object[0]);
                    }
                    if (e instanceof InvalidAlgorithmParameterException | e instanceof NoSuchPaddingException | e instanceof IllegalBlockSizeException | e instanceof BadPaddingException | e instanceof InvalidKeyException) {
                        throw new SFException(e, ErrorCode.ENCRYPTION_FAILURE, new Object[0]);
                    }
                    throw new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage());
                }
            }, this.buildUploadWorkers)));
            logger.logInfo("buildAndUpload task added for client={}, blob={}, buildUploadWorkers stats={}", this.owningClient.getName(), filePath, this.buildUploadWorkers.toString());
        }
        this.registerService.addBlobs(blobs);
    }

    BlobMetadata buildAndUpload(String filePath, List<List<ChannelData<T>>> blobData) throws IOException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, NoSuchPaddingException, IllegalBlockSizeException, BadPaddingException, InvalidKeyException {
        Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency);
        BlobBuilder.Blob blob = BlobBuilder.constructBlobAndMetadata(filePath, blobData, this.bdecVersion);
        if (buildContext != null) {
            buildContext.stop();
        }
        return this.upload(filePath, blob.blobBytes, blob.chunksMetadataList);
    }

    BlobMetadata upload(String filePath, byte[] blob, List<ChunkMetadata> metadata) throws NoSuchAlgorithmException {
        logger.logInfo("Start uploading file={}, size={}", filePath, blob.length);
        long startTime = System.currentTimeMillis();
        Timer.Context uploadContext = Utils.createTimerContext(this.owningClient.uploadLatency);
        this.targetStage.put(filePath, blob);
        if (uploadContext != null) {
            uploadContext.stop();
            this.owningClient.uploadThroughput.mark((long)blob.length);
            this.owningClient.blobSizeHistogram.update(blob.length);
            this.owningClient.blobRowCountHistogram.update(metadata.stream().mapToLong(i -> i.getEpInfo().getRowCount()).sum());
        }
        logger.logInfo("Finish uploading file={}, size={}, timeInMillis={}", filePath, blob.length, System.currentTimeMillis() - startTime);
        return BlobMetadata.createBlobMetadata(filePath, BlobBuilder.computeMD5(blob), this.bdecVersion, metadata);
    }

    void shutdown() throws InterruptedException {
        boolean isTerminated;
        this.flushWorker.shutdown();
        this.registerWorker.shutdown();
        this.buildUploadWorkers.shutdown();
        boolean bl = isTerminated = this.flushWorker.awaitTermination(300L, TimeUnit.SECONDS) && this.registerWorker.awaitTermination(300L, TimeUnit.SECONDS) && this.buildUploadWorkers.awaitTermination(300L, TimeUnit.SECONDS);
        if (!isTerminated) {
            logger.logWarn("Tasks can't be terminated within the timeout, force shutdown now.");
            this.flushWorker.shutdownNow();
            this.registerWorker.shutdownNow();
            this.buildUploadWorkers.shutdownNow();
        }
    }

    void setNeedFlush() {
        this.isNeedFlush = true;
    }

    private String getFilePath(String clientPrefix) {
        Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
        return this.getFilePath(calendar, clientPrefix);
    }

    String getFilePath(Calendar calendar, String clientPrefix) {
        if (this.isTestMode && clientPrefix == null) {
            clientPrefix = "testPrefix";
        }
        Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix);
        int year = calendar.get(1);
        int month = calendar.get(2) + 1;
        int day = calendar.get(5);
        int hour = calendar.get(11);
        int minute = calendar.get(12);
        long time = TimeUnit.MILLISECONDS.toSeconds(calendar.getTimeInMillis());
        long threadId = Thread.currentThread().getId();
        String fileName = Long.toString(time, 36) + "_" + clientPrefix + "_" + threadId + "_" + this.counter.getAndIncrement() + "." + "bdec";
        return year + "/" + month + "/" + day + "/" + hour + "/" + minute + "/" + fileName;
    }

    <CD> void invalidateAllChannelsInBlob(List<List<ChannelData<CD>>> blobData) {
        blobData.forEach(chunkData -> chunkData.forEach(channelData -> {
            if (channelData.getVectors() instanceof VectorSchemaRoot) {
                ((VectorSchemaRoot)channelData.getVectors()).close();
            }
            this.owningClient.getChannelCache().invalidateChannelIfSequencersMatch(channelData.getChannelContext().getDbName(), channelData.getChannelContext().getSchemaName(), channelData.getChannelContext().getTableName(), channelData.getChannelContext().getName(), channelData.getChannelContext().getChannelSequencer());
        }));
    }

    String getClientPrefix() {
        return this.targetStage.getClientPrefix();
    }

    boolean throttleDueToQueuedFlushTasks() {
        boolean throttleOnQueuedTasks;
        ThreadPoolExecutor buildAndUpload = (ThreadPoolExecutor)this.buildUploadWorkers;
        boolean bl = throttleOnQueuedTasks = buildAndUpload.getQueue().size() > Runtime.getRuntime().availableProcessors();
        if (throttleOnQueuedTasks) {
            logger.logWarn("Throttled due too many queue flush tasks (probably because of slow uploading speed), client={}, buildUploadWorkers stats={}", this.owningClient.getName(), this.buildUploadWorkers.toString());
        }
        return throttleOnQueuedTasks;
    }

    static class BlobData<T> {
        private final String filePath;
        private final List<List<ChannelData<T>>> data;

        BlobData(String filePath, List<List<ChannelData<T>>> data) {
            this.filePath = filePath;
            this.data = data;
        }

        String getFilePath() {
            return this.filePath;
        }

        List<List<ChannelData<T>>> getData() {
            return this.data;
        }
    }
}

