/*
 * Decompiled with CFR 0.152.
 */
package io.floodplain.streams.remotejoin;

import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.replication.api.ReplicationMessage;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CacheProcessor
implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private static final String CACHED_AT_KEY = "_cachedAt";
    private static final Logger logger = LoggerFactory.getLogger(CacheProcessor.class);
    private final Map<String, Record<String, ReplicationMessage>> cache;
    private KeyValueStore<String, ReplicationMessage> lookupStore;
    private ProcessorContext<String, ReplicationMessage> context;
    private final Duration cacheTime;
    private final String cacheProcName;
    private final Object sync = new Object();
    private final boolean memoryCache;
    private boolean clearPersistentCache = false;
    private final int maxSize;

    public CacheProcessor(String cacheProcName, Duration cacheTime, int maxSize, boolean inMemory) {
        this.cacheProcName = cacheProcName;
        this.cacheTime = cacheTime;
        this.cache = new ConcurrentHashMap<String, Record<String, ReplicationMessage>>();
        this.memoryCache = inMemory;
        this.maxSize = maxSize;
        logger.info("Using a cache time of {} seconds for {}", (Object)cacheTime, (Object)cacheProcName);
    }

    public void init(ProcessorContext context) {
        this.context = context;
        this.lookupStore = (KeyValueStore)context.getStateStore("STORE_" + this.cacheProcName);
        long runInterval = Math.max(this.cacheTime.toMillis() / 10L, 1000L);
        this.context.schedule(Duration.ofMillis(runInterval), PunctuationType.WALL_CLOCK_TIME, this::checkCache);
        logger.info("Created persistentCache for {} with check interval of {}ms", (Object)this.cacheProcName, (Object)runInterval);
        if (this.memoryCache) {
            this.clearPersistentCache = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(Record<String, ReplicationMessage> record) {
        String key = (String)record.key();
        ReplicationMessage message = (ReplicationMessage)record.value();
        Object object = this.sync;
        synchronized (object) {
            if (message == null || message.operation() == ReplicationMessage.Operation.DELETE) {
                this.cache.remove(key);
                this.lookupStore.delete((Object)key);
                this.context.forward(record);
            } else if (this.memoryCache) {
                if (this.cache.size() > this.maxSize) {
                    logger.warn("Reached max cache size!");
                    this.context.forward(record);
                } else {
                    this.cache.put(key, record);
                }
            } else {
                long cachedAt = record.timestamp();
                this.lookupStore.put((Object)key, (Object)message.with(CACHED_AT_KEY, (Object)cachedAt, ImmutableMessage.ValueType.LONG));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        Object object = this.sync;
        synchronized (object) {
            for (Map.Entry<String, Record<String, ReplicationMessage>> entry : this.cache.entrySet()) {
                this.context.forward(entry.getValue());
                this.cache.remove(entry.getKey());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void checkCache(long ms) {
        if (this.clearPersistentCache) {
            this.clearPersistentCache();
        }
        int entries = 0;
        int expiredEntries = 0;
        if (this.memoryCache) {
            HashSet<String> toForward = new HashSet<String>();
            for (Map.Entry<String, Record<String, ReplicationMessage>> entry : this.cache.entrySet()) {
                if (!this.isExpired(ms, entry.getValue())) continue;
                toForward.add((String)entry.getKey());
            }
            Object object = this.sync;
            synchronized (object) {
                for (String key : toForward) {
                    if (this.cache.get(key) == null) continue;
                    this.context.forward(this.cache.get(key));
                    this.cache.remove(key);
                }
            }
        }
        HashSet<String> possibleExpired = new HashSet<String>();
        try (KeyValueIterator it = this.lookupStore.all();){
            while (it.hasNext()) {
                KeyValue keyValue = (KeyValue)it.next();
                ++entries;
                long cachedAt = ((ReplicationMessage)keyValue.value).value(CACHED_AT_KEY).orElse(0L);
                if (ms - cachedAt < this.cacheTime.toMillis()) continue;
                possibleExpired.add((String)keyValue.key);
            }
        }
        Object object = this.sync;
        synchronized (object) {
            for (String key : possibleExpired) {
                long cachedAt;
                ReplicationMessage message = (ReplicationMessage)this.lookupStore.get((Object)key);
                if (message == null || ms - (cachedAt = message.value(CACHED_AT_KEY).orElse(0L).longValue()) < this.cacheTime.toMillis()) continue;
                ++expiredEntries;
                this.context.forward(new Record((Object)key, (Object)message.without(CACHED_AT_KEY), cachedAt));
                this.lookupStore.delete((Object)key);
            }
        }
        long duration = System.currentTimeMillis() - ms;
        if (entries > 0) {
            logger.info("Checked cache {} - {} entries, {} expired entries in {}ms", new Object[]{this.cacheProcName, entries, expiredEntries, duration});
        }
    }

    private void clearPersistentCache() {
        HashSet<String> toClear = new HashSet<String>();
        try (KeyValueIterator it = this.lookupStore.all();){
            while (it.hasNext()) {
                KeyValue next = (KeyValue)it.next();
                this.context.forward(new Record((Object)((String)next.key), (Object)((ReplicationMessage)next.value).without(CACHED_AT_KEY), ((ReplicationMessage)next.value).timestamp()));
                toClear.add((String)next.key);
            }
        }
        for (String key : toClear) {
            this.lookupStore.delete((Object)key);
        }
        this.clearPersistentCache = false;
    }

    private boolean isExpired(long timestamp, Record<String, ReplicationMessage> record) {
        return timestamp - record.timestamp() > this.cacheTime.toMillis();
    }
}

