/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.ingest.streaming.internal;

import com.google.common.annotations.VisibleForTesting;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.internal.AbstractRowBuffer;
import net.snowflake.ingest.streaming.internal.ChannelData;
import net.snowflake.ingest.streaming.internal.ChannelFlushContext;
import net.snowflake.ingest.streaming.internal.ChannelRuntimeState;
import net.snowflake.ingest.streaming.internal.ChannelsStatusResponse;
import net.snowflake.ingest.streaming.internal.ClientBufferParameters;
import net.snowflake.ingest.streaming.internal.ColumnMetadata;
import net.snowflake.ingest.streaming.internal.ColumnProperties;
import net.snowflake.ingest.streaming.internal.MemoryInfoProvider;
import net.snowflake.ingest.streaming.internal.MemoryInfoProviderFromRuntime;
import net.snowflake.ingest.streaming.internal.RowBuffer;
import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;

class SnowflakeStreamingIngestChannelInternal<T>
implements SnowflakeStreamingIngestChannel {
    private static final Logging logger = new Logging(SnowflakeStreamingIngestChannelInternal.class);
    private final ChannelFlushContext channelFlushContext;
    private final RowBuffer<T> rowBuffer;
    private volatile boolean isClosed = false;
    private final SnowflakeStreamingIngestClientInternal<T> owningClient;
    private final ChannelRuntimeState channelState;
    private final Map<String, ColumnProperties> tableColumns;

    SnowflakeStreamingIngestChannelInternal(String name, String dbName, String schemaName, String tableName, String offsetToken, Long channelSequencer, Long rowSequencer, SnowflakeStreamingIngestClientInternal<T> client, String encryptionKey, Long encryptionKeyId, OpenChannelRequest.OnErrorOption onErrorOption, ZoneOffset defaultTimezone) {
        this(name, dbName, schemaName, tableName, offsetToken, channelSequencer, rowSequencer, client, encryptionKey, encryptionKeyId, onErrorOption, defaultTimezone, client.getParameterProvider().getBlobFormatVersion());
    }

    SnowflakeStreamingIngestChannelInternal(String name, String dbName, String schemaName, String tableName, String offsetToken, Long channelSequencer, Long rowSequencer, SnowflakeStreamingIngestClientInternal<T> client, String encryptionKey, Long encryptionKeyId, OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, Constants.BdecVersion bdecVersion) {
        this.owningClient = client;
        this.channelFlushContext = new ChannelFlushContext(name, dbName, schemaName, tableName, channelSequencer, encryptionKey, encryptionKeyId);
        this.channelState = new ChannelRuntimeState(offsetToken, rowSequencer, true);
        this.rowBuffer = AbstractRowBuffer.createRowBuffer(onErrorOption, defaultTimezone, bdecVersion, this.getFullyQualifiedName(), this::collectRowSize, this.channelState, new ClientBufferParameters(this.owningClient));
        this.tableColumns = new HashMap<String, ColumnProperties>();
        logger.logInfo("Channel={} created for table={}", this.channelFlushContext.getName(), this.channelFlushContext.getTableName());
    }

    @Override
    public String getFullyQualifiedName() {
        return this.channelFlushContext.getFullyQualifiedName();
    }

    @Override
    public String getName() {
        return this.channelFlushContext.getName();
    }

    @Override
    public String getDBName() {
        return this.channelFlushContext.getDbName();
    }

    @Override
    public String getSchemaName() {
        return this.channelFlushContext.getSchemaName();
    }

    @Override
    public String getTableName() {
        return this.channelFlushContext.getTableName();
    }

    Long getChannelSequencer() {
        return this.channelFlushContext.getChannelSequencer();
    }

    @VisibleForTesting
    ChannelRuntimeState getChannelState() {
        return this.channelState;
    }

    @Override
    public String getFullyQualifiedTableName() {
        return this.channelFlushContext.getFullyQualifiedTableName();
    }

    ChannelData<T> getData(String filePath) {
        ChannelData<T> data = this.rowBuffer.flush(filePath);
        if (data != null) {
            data.setChannelContext(this.channelFlushContext);
        }
        return data;
    }

    @Override
    public boolean isValid() {
        return this.channelState.isValid();
    }

    void invalidate(String message) {
        this.channelState.invalidate();
        this.rowBuffer.close("invalidate");
        logger.logWarn("Channel is invalidated, name={}, channel sequencer={}, row sequencer={}, message={}", this.getFullyQualifiedName(), this.channelFlushContext.getChannelSequencer(), this.channelState.getRowSequencer(), message);
    }

    @Override
    public boolean isClosed() {
        return this.isClosed;
    }

    void markClosed() {
        this.isClosed = true;
        logger.logInfo("Channel is marked as closed, name={}, channel sequencer={}, row sequencer={}", this.getFullyQualifiedName(), this.channelFlushContext.getChannelSequencer(), this.channelState.getRowSequencer());
    }

    CompletableFuture<Void> flush(boolean closing) {
        if (this.isClosed() && !closing) {
            throw new SFException(ErrorCode.CLOSED_CHANNEL, this.getFullyQualifiedName());
        }
        if (this.rowBuffer.getSize() == 0.0f) {
            return CompletableFuture.completedFuture(null);
        }
        return this.owningClient.flush(false);
    }

    @Override
    public CompletableFuture<Void> close() {
        this.checkValidation();
        if (this.isClosed()) {
            return CompletableFuture.completedFuture(null);
        }
        this.markClosed();
        return this.flush(true).thenRunAsync(() -> {
            List<SnowflakeStreamingIngestChannelInternal<?>> uncommittedChannels = this.owningClient.verifyChannelsAreFullyCommitted(Collections.singletonList(this));
            this.rowBuffer.close("close");
            this.owningClient.removeChannelIfSequencersMatch(this);
            if (!this.isValid() || !uncommittedChannels.isEmpty()) {
                throw new SFException(ErrorCode.CHANNELS_WITH_UNCOMMITTED_ROWS, uncommittedChannels.stream().map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName).collect(Collectors.toList()));
            }
        });
    }

    void setupSchema(List<ColumnMetadata> columns) {
        logger.logDebug("Setup schema for channel={}, schema={}", this.getFullyQualifiedName(), columns);
        this.rowBuffer.setupSchema(columns);
        columns.forEach(c -> this.tableColumns.putIfAbsent(c.getName(), new ColumnProperties((ColumnMetadata)c)));
    }

    @Override
    public InsertValidationResponse insertRow(Map<String, Object> row, String offsetToken) {
        return this.insertRows(Collections.singletonList(row), offsetToken);
    }

    @Override
    public InsertValidationResponse insertRows(Iterable<Map<String, Object>> rows, String offsetToken) {
        this.throttleInsertIfNeeded(new MemoryInfoProviderFromRuntime());
        this.checkValidation();
        if (this.isClosed()) {
            throw new SFException(ErrorCode.CLOSED_CHANNEL, this.getFullyQualifiedName());
        }
        LinkedList<Map<String, Object>> rowsCopy = new LinkedList<Map<String, Object>>();
        rows.forEach(r -> rowsCopy.add(new LinkedHashMap(r)));
        InsertValidationResponse response = this.rowBuffer.insertRows(rowsCopy, offsetToken);
        if (this.rowBuffer.getSize() >= (float)this.owningClient.getParameterProvider().getMaxChannelSizeInBytes()) {
            this.owningClient.setNeedFlush();
        }
        return response;
    }

    void collectRowSize(float rowSize) {
        if (this.owningClient.inputThroughput != null) {
            this.owningClient.inputThroughput.mark((long)rowSize);
        }
    }

    @Override
    public String getLatestCommittedOffsetToken() {
        this.checkValidation();
        ChannelsStatusResponse.ChannelStatusResponseDTO response = this.owningClient.getChannelsStatus(Collections.singletonList(this)).getChannels().get(0);
        if (response.getStatusCode() != 0L) {
            throw new SFException(ErrorCode.CHANNEL_STATUS_INVALID, this.getName(), response.getStatusCode());
        }
        return response.getPersistedOffsetToken();
    }

    @Override
    public Map<String, ColumnProperties> getTableSchema() {
        return this.tableColumns;
    }

    void throttleInsertIfNeeded(MemoryInfoProvider memoryInfoProvider) {
        int retry;
        long insertThrottleIntervalInMs = this.owningClient.getParameterProvider().getInsertThrottleIntervalInMs();
        for (retry = 0; (this.hasLowRuntimeMemory(memoryInfoProvider) || this.owningClient.getFlushService() != null && this.owningClient.getFlushService().throttleDueToQueuedFlushTasks()) && retry < 60; ++retry) {
            try {
                Thread.sleep(insertThrottleIntervalInMs);
                continue;
            }
            catch (InterruptedException e) {
                throw new SFException(ErrorCode.INTERNAL_ERROR, "Insert throttle get interrupted");
            }
        }
        if (retry > 0) {
            logger.logInfo("Insert throttled for a total of {} milliseconds, retryCount={}, client={}, channel={}", (long)retry * insertThrottleIntervalInMs, retry, this.owningClient.getName(), this.getFullyQualifiedName());
        }
    }

    private boolean hasLowRuntimeMemory(MemoryInfoProvider memoryInfoProvider) {
        boolean hasLowRuntimeMemory;
        int insertThrottleThresholdInBytes = this.owningClient.getParameterProvider().getInsertThrottleThresholdInBytes();
        int insertThrottleThresholdInPercentage = this.owningClient.getParameterProvider().getInsertThrottleThresholdInPercentage();
        long maxMemoryLimitInBytes = this.owningClient.getParameterProvider().getMaxMemoryLimitInBytes();
        long maxMemory = maxMemoryLimitInBytes == -1L ? memoryInfoProvider.getMaxMemory() : maxMemoryLimitInBytes;
        long freeMemory = memoryInfoProvider.getFreeMemory() + (memoryInfoProvider.getMaxMemory() - memoryInfoProvider.getTotalMemory());
        boolean bl = hasLowRuntimeMemory = freeMemory < (long)insertThrottleThresholdInBytes && freeMemory * 100L / maxMemory < (long)insertThrottleThresholdInPercentage;
        if (hasLowRuntimeMemory) {
            logger.logWarn("Throttled due to memory pressure, client={}, channel={}.", this.owningClient.getName(), this.getFullyQualifiedName());
            Utils.showMemory();
        }
        return hasLowRuntimeMemory;
    }

    private void checkValidation() {
        if (!this.isValid()) {
            this.owningClient.removeChannelIfSequencersMatch(this);
            this.rowBuffer.close("checkValidation");
            throw new SFException(ErrorCode.INVALID_CHANNEL, this.getFullyQualifiedName());
        }
    }

    RowBuffer<T> getRowBuffer() {
        return this.rowBuffer;
    }

    @VisibleForTesting
    public ChannelFlushContext getChannelContext() {
        return this.channelFlushContext;
    }
}

