/*
 * Decompiled with CFR 0.152.
 */
package io.floodplain.streams.remotejoin;

import io.floodplain.replication.api.ReplicationMessage;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;

public class StoreProcessor
implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private final String lookupStoreName;
    private KeyValueStore<String, ReplicationMessage> lookupStore;
    private ProcessorContext<String, ReplicationMessage> context;

    public StoreProcessor(String lookupStoreName) {
        this.lookupStoreName = lookupStoreName;
    }

    public void init(ProcessorContext<String, ReplicationMessage> context) {
        this.context = context;
        this.lookupStore = (KeyValueStore)context.getStateStore(this.lookupStoreName);
    }

    public void close() {
    }

    public void process(Record<String, ReplicationMessage> record) {
        ReplicationMessage outerMessage = (ReplicationMessage)record.value();
        String key = (String)record.key();
        if (outerMessage == null || outerMessage.operation() == ReplicationMessage.Operation.DELETE) {
            ReplicationMessage previous = (ReplicationMessage)this.lookupStore.get((Object)key);
            if (previous != null) {
                this.lookupStore.delete((Object)key);
                this.context.forward(record.withValue((Object)previous.withOperation(ReplicationMessage.Operation.DELETE)));
            }
            Record out = new Record((Object)key, null, record.timestamp());
            this.context.forward(out);
        } else {
            this.lookupStore.put((Object)key, (Object)outerMessage);
            this.context.forward(record);
        }
    }
}

