/*
 * Decompiled with CFR 0.152.
 */
package io.floodplain.streams.remotejoin;

import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.replication.factory.ReplicationFactory;
import java.util.Collections;
import java.util.HashMap;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DiffProcessor
implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private final String lookupStoreName;
    private KeyValueStore<String, ReplicationMessage> lookupStore;
    private static final Logger logger = LoggerFactory.getLogger(DiffProcessor.class);
    private ProcessorContext<String, ReplicationMessage> context;

    public DiffProcessor(String lookupStoreName) {
        this.lookupStoreName = lookupStoreName;
    }

    public void init(ProcessorContext context) {
        this.context = context;
        this.lookupStore = (KeyValueStore)context.getStateStore(this.lookupStoreName);
    }

    public void process(Record<String, ReplicationMessage> record) {
        ReplicationMessage incoming = (ReplicationMessage)record.value();
        String key = (String)record.key();
        if (incoming == null || incoming.operation() == ReplicationMessage.Operation.DELETE) {
            logger.debug("Delete detected in store: {} with key: {}", (Object)this.lookupStoreName, (Object)key);
            ReplicationMessage previous = (ReplicationMessage)this.lookupStore.get((Object)key);
            if (previous != null) {
                this.lookupStore.delete((Object)key);
                ReplicationMessage forwarding = this.createMessage(key).withSubMessage("old", previous.message()).withOperation(ReplicationMessage.Operation.DELETE);
                this.context.forward(record.withValue((Object)forwarding));
            }
        } else {
            ReplicationMessage previous = (ReplicationMessage)this.lookupStore.get((Object)key);
            if (previous != null) {
                boolean isDifferent = this.diff(previous, incoming);
                if (isDifferent) {
                    this.lookupStore.put((Object)key, (Object)incoming);
                    ReplicationMessage forwarding = this.createMessage(key).withSubMessage("old", previous.message()).withSubMessage("new", incoming.message()).withOperation(ReplicationMessage.Operation.UPDATE);
                    this.context.forward(record.withValue((Object)forwarding));
                } else {
                    logger.debug("Ignoring identical message for key: {} for store: {}", (Object)key, (Object)this.lookupStoreName);
                }
            } else {
                this.lookupStore.put((Object)key, (Object)incoming);
                ReplicationMessage forwarding = this.createMessage(key).withSubMessage("new", incoming.message()).withOperation(ReplicationMessage.Operation.UPDATE);
                this.context.forward(record.withValue((Object)forwarding));
            }
            this.lookupStore.put((Object)key, (Object)incoming);
        }
    }

    public void close() {
    }

    private ReplicationMessage createMessage(String key) {
        HashMap<String, String> value = new HashMap<String, String>();
        value.put("key", key);
        HashMap<String, ImmutableMessage.ValueType> types = new HashMap<String, ImmutableMessage.ValueType>();
        types.put("key", ImmutableMessage.ValueType.STRING);
        return ReplicationFactory.fromMap((String)key, value, types).withPrimaryKeys(Collections.singletonList("key"));
    }

    private boolean diff(ReplicationMessage previous, ReplicationMessage incoming) {
        return !previous.equalsToMessage(incoming);
    }
}

