/*
 * Decompiled with CFR 0.152.
 */
package io.floodplain.streams.remotejoin;

import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.replication.factory.ReplicationFactory;
import java.util.ArrayList;
import java.util.Collections;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HistoryProcessor
implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private final String lookupStoreName;
    private final String keyCounterStoreName;
    private ProcessorContext<String, ReplicationMessage> context;
    private static final Logger logger = LoggerFactory.getLogger(HistoryProcessor.class);
    private KeyValueStore<String, ReplicationMessage> lookupStore;
    private KeyValueStore<String, Long> keyCountStore;

    public HistoryProcessor(String lookupStoreName, String keyCounterStoreName) {
        this.lookupStoreName = lookupStoreName;
        this.keyCounterStoreName = keyCounterStoreName;
    }

    public void init(ProcessorContext<String, ReplicationMessage> context) {
        this.context = context;
        this.lookupStore = (KeyValueStore)context.getStateStore(this.lookupStoreName);
        this.keyCountStore = (KeyValueStore)context.getStateStore(this.keyCounterStoreName);
    }

    public void process(Record<String, ReplicationMessage> record) {
        String key = (String)record.key();
        if (record.value() == null) {
            this.processDelete(key);
            return;
        }
        if (((ReplicationMessage)record.value()).operation() == ReplicationMessage.Operation.DELETE) {
            this.processDelete(key);
            return;
        }
        Long keyCount = (Long)this.keyCountStore.get((Object)key);
        if (keyCount == null) {
            keyCount = 0L;
            this.keyCountStore.put((Object)key, (Object)keyCount);
        } else {
            keyCount = keyCount + 1L;
        }
        String formatedKey = String.format("%08d", keyCount);
        String groupedKey = key + "|" + formatedKey;
        this.lookupStore.put((Object)groupedKey, (Object)((ReplicationMessage)record.value()));
        this.forwardHistory(key);
    }

    private void processDelete(String key) {
        ArrayList<String> toBeDeleted = new ArrayList<String>();
        try (KeyValueIterator it = this.lookupStore.range((Object)(key + "|"), (Object)(key + "}"));){
            while (it.hasNext()) {
                KeyValue keyValue = (KeyValue)it.next();
                toBeDeleted.add((String)keyValue.key);
            }
        }
        for (String k : toBeDeleted) {
            this.lookupStore.delete((Object)k);
        }
        ReplicationMessage result = ReplicationFactory.empty().withSubMessages("list", Collections.emptyList());
        this.context.forward(new Record((Object)key, (Object)result, result.timestamp()));
    }

    private void forwardHistory(String key) {
        logger.warn("whoop");
        ArrayList<ImmutableMessage> history = new ArrayList<ImmutableMessage>();
        try (KeyValueIterator it = this.lookupStore.range((Object)(key + "|"), (Object)(key + "}"));){
            while (it.hasNext()) {
                KeyValue keyValue = (KeyValue)it.next();
                history.add(((ReplicationMessage)keyValue.value).message());
            }
        }
        ReplicationMessage result = ReplicationFactory.empty().withSubMessages("list", history);
        this.context.forward(new Record((Object)key, (Object)result, result.timestamp()));
    }
}

