/*
 * Decompiled with CFR 0.152.
 */
package io.floodplain.streams.remotejoin.ranged;

import io.floodplain.replication.api.ReplicationMessage;
import java.util.function.Function;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupedUpdateProcessor
implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private final String mappingStoreName;
    private final String lookupStoreName;
    private KeyValueStore<String, ReplicationMessage> lookupStore;
    private KeyValueStore<String, ReplicationMessage> mappingStore;
    private final Function<ReplicationMessage, String> keyExtract;
    private final boolean log;
    private static final Logger logger = LoggerFactory.getLogger(GroupedUpdateProcessor.class);
    private ProcessorContext<String, ReplicationMessage> context;

    public GroupedUpdateProcessor(String lookupStoreName, Function<ReplicationMessage, String> keyExtract, String mappingStoreName) {
        this.lookupStoreName = lookupStoreName;
        this.mappingStoreName = mappingStoreName;
        this.keyExtract = keyExtract;
        this.log = false;
    }

    public void init(ProcessorContext<String, ReplicationMessage> context) {
        this.context = context;
        this.lookupStore = (KeyValueStore)context.getStateStore(this.lookupStoreName);
        this.mappingStore = (KeyValueStore)context.getStateStore(this.mappingStoreName);
    }

    public void process(Record<String, ReplicationMessage> record) {
        String key = (String)record.key();
        ReplicationMessage msg = (ReplicationMessage)record.value();
        if (msg != null) {
            String assembled = this.assembleGroupedKey(key, msg);
            if (this.log) {
                logger.info("Processor: {}, Assembling key. original: {} assembled: {}", new Object[]{this.lookupStoreName, key, assembled});
            }
            if (msg.operation() == ReplicationMessage.Operation.DELETE) {
                this.lookupStore.delete((Object)assembled);
                this.mappingStore.delete((Object)key);
            } else {
                String previousAssembled;
                ReplicationMessage previousVersion = (ReplicationMessage)this.mappingStore.get((Object)key);
                if (previousVersion != null && !assembled.equals(previousAssembled = this.assembleGroupedKey(key, previousVersion))) {
                    this.lookupStore.delete((Object)previousAssembled);
                }
                this.lookupStore.put((Object)assembled, (Object)msg.now());
                this.mappingStore.put((Object)key, (Object)msg);
            }
            this.context.forward(new Record((Object)assembled, (Object)msg.now(), record.timestamp()));
        }
    }

    public void close() {
    }

    private String assembleGroupedKey(String key, ReplicationMessage msg) {
        String extracted = this.keyExtract.apply(msg);
        if (extracted.indexOf(124) != -1) {
            throw new IllegalArgumentException("Can't prefix key. Already a grouped key: " + extracted + " grouping with: " + key);
        }
        if (key.indexOf(124) != -1) {
            throw new IllegalArgumentException("Can't prefix with key. Already a grouped key: " + key + " prepending with: " + extracted);
        }
        return extracted + "|" + key;
    }
}

