/*
 * Decompiled with CFR 0.152.
 */
package io.floodplain.streams.remotejoin;

import io.floodplain.replication.api.ReplicationMessage;
import java.util.function.BiFunction;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OneToOneProcessor
implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private final String forwardLookupStoreName;
    private final String reverseLookupStoreName;
    private KeyValueStore<String, ReplicationMessage> forwardLookupStore;
    private KeyValueStore<String, ReplicationMessage> reverseLookupStore;
    private final BiFunction<ReplicationMessage, ReplicationMessage, ReplicationMessage> joinFunction;
    private final boolean optional;
    private static final Logger logger = LoggerFactory.getLogger(OneToOneProcessor.class);
    private ProcessorContext<String, ReplicationMessage> context;

    public OneToOneProcessor(String forwardLookupStoreName, String reverseLookupStoreName, boolean optional, BiFunction<ReplicationMessage, ReplicationMessage, ReplicationMessage> joinFunction) {
        this.forwardLookupStoreName = forwardLookupStoreName;
        this.reverseLookupStoreName = reverseLookupStoreName;
        this.optional = optional;
        this.joinFunction = joinFunction;
    }

    public void init(ProcessorContext<String, ReplicationMessage> context) {
        this.context = context;
        logger.info("inner lookup Looking up: " + this.forwardLookupStoreName);
        this.forwardLookupStore = (KeyValueStore)context.getStateStore(this.forwardLookupStoreName);
        logger.info("inner lookup Looking up: " + this.reverseLookupStoreName);
        this.reverseLookupStore = (KeyValueStore)context.getStateStore(this.reverseLookupStoreName);
        logger.info("One-to-one successfully started");
    }

    public void process(Record<String, ReplicationMessage> record) {
        ReplicationMessage counterpart;
        boolean reverse = false;
        String key = (String)record.key();
        ReplicationMessage innerMessage = (ReplicationMessage)record.value();
        if (innerMessage == null) {
            this.context.forward(record.withValue(null));
            return;
        }
        KeyValueStore<String, ReplicationMessage> lookupStore = this.reverseLookupStore;
        if (key.endsWith("_REV_")) {
            reverse = true;
            key = key.substring(0, key.length() - "_REV_".length());
            lookupStore = this.forwardLookupStore;
        }
        if ((counterpart = (ReplicationMessage)lookupStore.get((Object)key)) == null) {
            if (!reverse && this.optional) {
                this.context.forward(new Record((Object)key, (Object)innerMessage, record.timestamp()));
            }
            return;
        }
        if (reverse) {
            if (innerMessage.operation() == ReplicationMessage.Operation.DELETE && !this.optional) {
                this.context.forward(new Record((Object)key, (Object)counterpart.withOperation(ReplicationMessage.Operation.DELETE), record.timestamp()));
                this.context.forward(new Record((Object)key, null, record.timestamp()));
            } else if (innerMessage.operation() == ReplicationMessage.Operation.DELETE) {
                this.context.forward(new Record((Object)key, (Object)counterpart, record.timestamp()));
            } else {
                ReplicationMessage msg = this.joinFunction.apply(counterpart, innerMessage);
                this.context.forward(new Record((Object)key, (Object)msg, record.timestamp()));
            }
        } else {
            ReplicationMessage msg = this.joinFunction.apply(innerMessage, counterpart);
            this.context.forward(new Record((Object)key, (Object)msg, record.timestamp()));
        }
    }
}

