/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.ingest.streaming.internal;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.internal.ArrowRowBuffer;
import net.snowflake.ingest.streaming.internal.ChannelData;
import net.snowflake.ingest.streaming.internal.ChannelRuntimeState;
import net.snowflake.ingest.streaming.internal.ColumnMetadata;
import net.snowflake.ingest.streaming.internal.EpInfo;
import net.snowflake.ingest.streaming.internal.FileColumnProperties;
import net.snowflake.ingest.streaming.internal.LiteralQuoteUtils;
import net.snowflake.ingest.streaming.internal.ParquetRowBuffer;
import net.snowflake.ingest.streaming.internal.RowBuffer;
import net.snowflake.ingest.streaming.internal.RowBufferStats;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.VisibleForTesting;

abstract class AbstractRowBuffer<T>
implements RowBuffer<T> {
    private static final Logging logger = new Logging(AbstractRowBuffer.class);
    private static final int INVALID_SERVER_SIDE_DATA_TYPE_ORDINAL = -1;
    @VisibleForTesting
    Map<String, RowBufferStats> statsMap;
    @VisibleForTesting
    Map<String, RowBufferStats> tempStatsMap;
    private final Lock flushLock;
    @VisibleForTesting
    volatile int rowCount;
    private volatile float bufferSize;
    private final Set<String> nonNullableFieldNames;
    final String channelFullyQualifiedName;
    private final Consumer<Float> rowSizeMetric;
    final BufferAllocator allocator;
    final ChannelRuntimeState channelState;
    final OpenChannelRequest.OnErrorOption onErrorOption;
    final ZoneId defaultTimezone;

    AbstractRowBuffer(OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, BufferAllocator allocator, String fullyQualifiedChannelName, Consumer<Float> rowSizeMetric, ChannelRuntimeState channelRuntimeState) {
        this.onErrorOption = onErrorOption;
        this.defaultTimezone = defaultTimezone;
        this.rowSizeMetric = rowSizeMetric;
        this.channelState = channelRuntimeState;
        this.channelFullyQualifiedName = fullyQualifiedChannelName;
        this.allocator = allocator;
        this.nonNullableFieldNames = new HashSet<String>();
        this.flushLock = new ReentrantLock();
        this.rowCount = 0;
        this.bufferSize = 0.0f;
        this.statsMap = new HashMap<String, RowBufferStats>();
        this.tempStatsMap = new HashMap<String, RowBufferStats>();
    }

    void addNonNullableFieldName(String nonNullableFieldName) {
        this.nonNullableFieldNames.add(nonNullableFieldName);
    }

    void validateColumnCollation(ColumnMetadata column) {
        if (column.getCollation() != null) {
            throw new SFException(ErrorCode.UNSUPPORTED_DATA_TYPE, String.format("Column %s with collation %s detected. Ingestion into collated columns is not supported", column.getName(), column.getCollation()));
        }
    }

    @Override
    public float getSize() {
        return this.bufferSize;
    }

    Set<String> verifyInputColumns(Map<String, Object> row, InsertValidationResponse.InsertError error) {
        Map<String, String> inputColNamesMap = row.keySet().stream().collect(Collectors.toMap(LiteralQuoteUtils::unquoteColumnName, value -> value));
        ArrayList<String> extraCols = new ArrayList<String>();
        for (String columnName : inputColNamesMap.keySet()) {
            if (this.hasColumn(columnName)) continue;
            extraCols.add(inputColNamesMap.get(columnName));
        }
        if (!extraCols.isEmpty()) {
            if (error != null) {
                error.setExtraColNames(extraCols);
            }
            throw new SFException(ErrorCode.INVALID_ROW, "Extra columns: " + extraCols, "Columns not present in the table shouldn't be specified.");
        }
        ArrayList<String> missingCols = new ArrayList<String>();
        for (String columnName : this.nonNullableFieldNames) {
            if (inputColNamesMap.containsKey(columnName)) continue;
            missingCols.add(this.statsMap.get(columnName).getColumnDisplayName());
        }
        if (!missingCols.isEmpty()) {
            if (error != null) {
                error.setMissingNotNullColNames(missingCols);
            }
            throw new SFException(ErrorCode.INVALID_ROW, "Missing columns: " + missingCols, "Values for all non-nullable columns must be specified.");
        }
        return inputColNamesMap.keySet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public InsertValidationResponse insertRows(Iterable<Map<String, Object>> rows, String offsetToken) {
        float rowsSizeInBytes = 0.0f;
        if (!this.hasColumns()) {
            throw new SFException(ErrorCode.INTERNAL_ERROR, "Empty column fields");
        }
        InsertValidationResponse response = new InsertValidationResponse();
        this.flushLock.lock();
        try {
            this.channelState.updateInsertStats(System.currentTimeMillis(), this.rowCount);
            if (this.onErrorOption == OpenChannelRequest.OnErrorOption.CONTINUE) {
                long rowIndex = 0L;
                for (Map<String, Object> row : rows) {
                    InsertValidationResponse.InsertError error = new InsertValidationResponse.InsertError(row, rowIndex);
                    try {
                        Set<String> inputColumnNames = this.verifyInputColumns(row, error);
                        rowsSizeInBytes += this.addRow(row, this.rowCount, this.statsMap, inputColumnNames);
                        ++this.rowCount;
                    }
                    catch (SFException e) {
                        error.setException(e);
                        response.addError(error);
                    }
                    catch (Throwable e) {
                        logger.logWarn("Unexpected error happens during insertRows: {}", e);
                        error.setException(new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()));
                        response.addError(error);
                    }
                    ++rowIndex;
                    if (this.rowCount != Integer.MAX_VALUE) continue;
                    throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
                }
            } else {
                float tempRowsSizeInBytes = 0.0f;
                int tempRowCount = 0;
                for (Map<String, Object> row : rows) {
                    Set<String> inputColumnNames = this.verifyInputColumns(row, null);
                    tempRowsSizeInBytes += this.addTempRow(row, tempRowCount, this.tempStatsMap, inputColumnNames);
                    ++tempRowCount;
                }
                this.moveTempRowsToActualBuffer(tempRowCount);
                rowsSizeInBytes = tempRowsSizeInBytes;
                if ((long)this.rowCount + (long)tempRowCount >= Integer.MAX_VALUE) {
                    throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
                }
                this.rowCount += tempRowCount;
                this.statsMap.forEach((colName, stats) -> this.statsMap.put((String)colName, RowBufferStats.getCombinedStats(stats, this.tempStatsMap.get(colName))));
            }
            this.bufferSize += rowsSizeInBytes;
            this.channelState.setOffsetToken(offsetToken);
            this.rowSizeMetric.accept(Float.valueOf(rowsSizeInBytes));
        }
        finally {
            this.tempStatsMap.values().forEach(RowBufferStats::reset);
            this.clearTempRows();
            this.flushLock.unlock();
        }
        return response;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ChannelData<T> flush(String filePath) {
        logger.logDebug("Start get data for channel={}", this.channelFullyQualifiedName);
        if (this.rowCount > 0) {
            Optional oldData = Optional.empty();
            int oldRowCount = 0;
            float oldBufferSize = 0.0f;
            long oldRowSequencer = 0L;
            String oldOffsetToken = null;
            HashMap<String, RowBufferStats> oldColumnEps = null;
            Pair<Long, Long> oldMinMaxInsertTimeInMs = null;
            logger.logDebug("Arrow buffer flush about to take lock on channel={}", this.channelFullyQualifiedName);
            this.flushLock.lock();
            try {
                if (this.rowCount > 0) {
                    oldData = this.getSnapshot(filePath);
                    oldRowCount = this.rowCount;
                    oldBufferSize = this.bufferSize;
                    oldRowSequencer = this.channelState.incrementAndGetRowSequencer();
                    oldOffsetToken = this.channelState.getOffsetToken();
                    oldColumnEps = new HashMap<String, RowBufferStats>(this.statsMap);
                    oldMinMaxInsertTimeInMs = new Pair<Long, Long>(this.channelState.getFirstInsertInMs(), this.channelState.getLastInsertInMs());
                    this.reset();
                }
            }
            finally {
                this.flushLock.unlock();
            }
            logger.logDebug("Arrow buffer flush released lock on channel={}, rowCount={}, bufferSize={}", this.channelFullyQualifiedName, oldRowCount, Float.valueOf(oldBufferSize));
            if (oldData.isPresent()) {
                ChannelData data = new ChannelData();
                data.setVectors(oldData.get());
                data.setRowCount(oldRowCount);
                data.setBufferSize(oldBufferSize);
                data.setRowSequencer(oldRowSequencer);
                data.setOffsetToken(oldOffsetToken);
                data.setColumnEps(oldColumnEps);
                data.setMinMaxInsertTimeInMs(oldMinMaxInsertTimeInMs);
                data.setFlusherFactory(this::createFlusher);
                return data;
            }
        }
        return null;
    }

    abstract boolean hasColumn(String var1);

    abstract float addRow(Map<String, Object> var1, int var2, Map<String, RowBufferStats> var3, Set<String> var4);

    abstract float addTempRow(Map<String, Object> var1, int var2, Map<String, RowBufferStats> var3, Set<String> var4);

    abstract void moveTempRowsToActualBuffer(int var1);

    abstract void clearTempRows();

    abstract boolean hasColumns();

    void reset() {
        this.rowCount = 0;
        this.bufferSize = 0.0f;
        this.statsMap.replaceAll((key, value) -> new RowBufferStats(value.getColumnDisplayName(), value.getCollationDefinitionString()));
    }

    abstract Optional<T> getSnapshot(String var1);

    @VisibleForTesting
    abstract Object getVectorValueAt(String var1, int var2);

    @VisibleForTesting
    abstract int getTempRowCount();

    abstract void closeInternal();

    @Override
    public synchronized void close(String name) {
        long allocatedBeforeRelease = this.allocator.getAllocatedMemory();
        this.closeInternal();
        long allocatedAfterRelease = this.allocator.getAllocatedMemory();
        logger.logInfo("Trying to close {} for channel={} from function={}, allocatedBeforeRelease={}, allocatedAfterRelease={}", this.getClass().getSimpleName(), this.channelFullyQualifiedName, name, allocatedBeforeRelease, allocatedAfterRelease);
        Utils.closeAllocator(this.allocator);
        if (allocatedBeforeRelease > 0L && this.channelState.isValid()) {
            throw new SFException(ErrorCode.INTERNAL_ERROR, String.format("Memory leaked=%d by allocator=%s, channel=%s", allocatedBeforeRelease, this.allocator, this.channelFullyQualifiedName));
        }
    }

    static EpInfo buildEpInfoFromStats(long rowCount, Map<String, RowBufferStats> colStats) {
        EpInfo epInfo = new EpInfo(rowCount, new HashMap<String, FileColumnProperties>());
        for (Map.Entry<String, RowBufferStats> colStat : colStats.entrySet()) {
            RowBufferStats stat = colStat.getValue();
            FileColumnProperties dto = new FileColumnProperties(stat);
            String colName = colStat.getValue().getColumnDisplayName();
            epInfo.getColumnEps().put(colName, dto);
        }
        epInfo.verifyEpInfo();
        return epInfo;
    }

    static <T> AbstractRowBuffer<T> createRowBuffer(OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, BufferAllocator allocator, Constants.BdecVersion bdecVersion, String fullyQualifiedChannelName, Consumer<Float> rowSizeMetric, ChannelRuntimeState channelRuntimeState, boolean bufferForTests, boolean enableParquetMemoryOptimization) {
        switch (bdecVersion) {
            case ONE: {
                return new ArrowRowBuffer(onErrorOption, defaultTimezone, allocator, fullyQualifiedChannelName, rowSizeMetric, channelRuntimeState);
            }
            case THREE: {
                return new ParquetRowBuffer(onErrorOption, defaultTimezone, allocator, fullyQualifiedChannelName, rowSizeMetric, channelRuntimeState, bufferForTests, enableParquetMemoryOptimization);
            }
        }
        throw new SFException(ErrorCode.INTERNAL_ERROR, "Unsupported BDEC format version: " + (Object)((Object)bdecVersion));
    }

    static enum ColumnPhysicalType {
        ROWINDEX(9),
        DOUBLE(7),
        SB1(1),
        SB2(2),
        SB4(3),
        SB8(4),
        SB16(5),
        LOB(8),
        BINARY,
        ROW(10);

        private final int ordinal;

        private ColumnPhysicalType() {
            this(-1);
        }

        private ColumnPhysicalType(int ordinal) {
            this.ordinal = ordinal;
        }

        public int getOrdinal() {
            return this.ordinal;
        }
    }

    static enum ColumnLogicalType {
        ANY,
        BOOLEAN(1),
        ROWINDEX,
        NULL(15),
        REAL(8),
        FIXED(2),
        TEXT(9),
        CHAR,
        BINARY(10),
        DATE(7),
        TIME(6),
        TIMESTAMP_LTZ(3),
        TIMESTAMP_NTZ(4),
        TIMESTAMP_TZ(5),
        INTERVAL,
        RAW,
        ARRAY(13, true),
        OBJECT(12, true),
        VARIANT(11, true),
        ROW,
        SEQUENCE,
        FUNCTION,
        USER_DEFINED_TYPE;

        private final int ordinal;
        private final boolean object;

        private ColumnLogicalType() {
            this(-1);
        }

        private ColumnLogicalType(int ordinal) {
            this(ordinal, false);
        }

        private ColumnLogicalType(int ordinal, boolean object) {
            this.ordinal = ordinal;
            this.object = object;
        }

        public int getOrdinal() {
            return this.ordinal;
        }

        public boolean isObject() {
            return this.object;
        }
    }
}

