/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.ingest.streaming.internal;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;

class ChannelCache<T> {
    private final ConcurrentHashMap<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>> cache = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, FlushInfo> tableFlushInfo = new ConcurrentHashMap();

    ChannelCache() {
    }

    void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
        ConcurrentHashMap channels = this.cache.computeIfAbsent(channel.getFullyQualifiedTableName(), v -> new ConcurrentHashMap());
        this.tableFlushInfo.putIfAbsent(channel.getFullyQualifiedTableName(), new FlushInfo(System.currentTimeMillis(), false));
        SnowflakeStreamingIngestChannelInternal<T> oldChannel = channels.put(channel.getName(), channel);
        if (oldChannel != null) {
            String invalidationCause = String.format("Old channel removed from cache, channelName=%s", channel.getName());
            oldChannel.invalidate("removed from cache", invalidationCause);
        }
    }

    Long getLastFlushTime(String fullyQualifiedTableName) {
        FlushInfo tableFlushInfo = this.tableFlushInfo.get(fullyQualifiedTableName);
        if (tableFlushInfo == null) {
            throw new SFException(ErrorCode.INTERNAL_ERROR, String.format("Last flush time for table %s not found", fullyQualifiedTableName));
        }
        return tableFlushInfo.lastFlushTime;
    }

    void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) {
        this.tableFlushInfo.compute(fullyQualifiedTableName, (k, v) -> {
            if (v == null) {
                throw new SFException(ErrorCode.INTERNAL_ERROR, String.format("Last flush time for table %s not found", fullyQualifiedTableName));
            }
            return new FlushInfo(lastFlushTime, v.needFlush);
        });
    }

    boolean getNeedFlush(String fullyQualifiedTableName) {
        FlushInfo tableFlushInfo = this.tableFlushInfo.get(fullyQualifiedTableName);
        if (tableFlushInfo == null) {
            throw new SFException(ErrorCode.INTERNAL_ERROR, String.format("Need flush flag for table %s not found", fullyQualifiedTableName));
        }
        return tableFlushInfo.needFlush;
    }

    void setNeedFlush(String fullyQualifiedTableName, boolean needFlush) {
        this.tableFlushInfo.compute(fullyQualifiedTableName, (k, v) -> {
            if (v == null) {
                throw new SFException(ErrorCode.INTERNAL_ERROR, String.format("Need flush flag for table %s not found", fullyQualifiedTableName));
            }
            return new FlushInfo(v.lastFlushTime, needFlush);
        });
    }

    Set<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>> entrySet() {
        return Collections.unmodifiableSet(this.cache.entrySet());
    }

    Set<String> keySet() {
        return Collections.unmodifiableSet(this.cache.keySet());
    }

    void closeAllChannels() {
        this.cache.values().forEach(channels -> channels.values().forEach(channel -> channel.markClosed()));
    }

    void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelInternal<T> channel) {
        this.cache.computeIfPresent(channel.getFullyQualifiedTableName(), (k, v) -> {
            SnowflakeStreamingIngestChannelInternal channelInCache = (SnowflakeStreamingIngestChannelInternal)v.get(channel.getName());
            return channelInCache != null && channelInCache.getChannelSequencer() == channel.getChannelSequencer() && v.remove(channel.getName()) != null && v.isEmpty() ? null : v;
        });
    }

    void invalidateChannelIfSequencersMatch(String dbName, String schemaName, String tableName, String channelName, Long channelSequencer, String invalidationCause) {
        SnowflakeStreamingIngestChannelInternal<T> channel;
        String fullyQualifiedTableName = String.format("%s.%s.%s", dbName, schemaName, tableName);
        ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>> channelsMapPerTable = this.cache.get(fullyQualifiedTableName);
        if (channelsMapPerTable != null && (channel = channelsMapPerTable.get(channelName)) != null && channel.getChannelSequencer().equals(channelSequencer)) {
            channel.invalidate("invalidate with matched sequencer", invalidationCause);
        }
    }

    int getSize() {
        return this.cache.size();
    }

    static class FlushInfo {
        final long lastFlushTime;
        final boolean needFlush;

        FlushInfo(long lastFlushTime, boolean needFlush) {
            this.lastFlushTime = lastFlushTime;
            this.needFlush = needFlush;
        }
    }
}

