/*
 * Decompiled with CFR 0.152.
 */
package io.floodplain.streams.remotejoin.ranged;

import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.streams.api.CoreOperators;
import java.util.ArrayList;
import java.util.function.BiFunction;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManyToOneGroupedProcessor
implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private static final Logger logger = LoggerFactory.getLogger(ManyToOneGroupedProcessor.class);
    private final String fromProcessorName;
    private final String withProcessorName;
    private final boolean optional;
    private final BiFunction<ReplicationMessage, ReplicationMessage, ReplicationMessage> joinFunction = (a, b) -> a.withParamMessage(b.message());
    private KeyValueStore<String, ReplicationMessage> forwardLookupStore;
    private KeyValueStore<String, ReplicationMessage> reverseLookupStore;
    private ProcessorContext<String, ReplicationMessage> context;

    public ManyToOneGroupedProcessor(String fromProcessor, String withProcessor, boolean optional) {
        this.fromProcessorName = fromProcessor;
        this.withProcessorName = withProcessor;
        this.optional = optional;
    }

    public void init(ProcessorContext context) {
        this.context = context;
        this.forwardLookupStore = (KeyValueStore)context.getStateStore("STORE_" + this.fromProcessorName);
        this.reverseLookupStore = (KeyValueStore)context.getStateStore("STORE_" + this.withProcessorName);
    }

    public void process(Record<String, ReplicationMessage> record) {
        String key = (String)record.key();
        ReplicationMessage message = (ReplicationMessage)record.value();
        if (key.contains("{")) {
            throw new RuntimeException("Failed. bad key: " + key);
        }
        boolean reverse = false;
        if (key.endsWith("_REV_")) {
            reverse = true;
            key = key.substring(0, key.length() - "_REV_".length());
        }
        if (reverse) {
            this.reverseJoin(key, message, record.timestamp());
        } else {
            this.forwardJoin(key, message, record.timestamp());
        }
    }

    private void reverseJoin(String key, ReplicationMessage message, long timestamp) {
        if (message == null) {
            logger.debug("reverseJoin joinGrouped emitting null message with key: {} ", (Object)key);
            this.context.forward(new Record((Object)key, null, timestamp));
            return;
        }
        if (message.operation() == ReplicationMessage.Operation.DELETE) {
            this.reverseJoinDelete(key, message);
            return;
        }
        ReplicationMessage withOperation = message.withOperation(message.operation());
        try (KeyValueIterator it = this.forwardLookupStore.range((Object)(key + "|"), (Object)(key + "}"));){
            while (it.hasNext()) {
                KeyValue keyValue = (KeyValue)it.next();
                String parentKey = CoreOperators.ungroupKey((String)((String)keyValue.key));
                ReplicationMessage joined = this.joinFunction.apply((ReplicationMessage)keyValue.value, withOperation);
                this.forwardMessage(parentKey, joined, timestamp);
            }
        }
    }

    private void reverseJoinDelete(String key, ReplicationMessage message) {
        logger.debug("Delete detected for key: {}", (Object)key);
        ArrayList<String> deleted = new ArrayList<String>();
        try (KeyValueIterator it = this.forwardLookupStore.range((Object)(key + "|"), (Object)(key + "}"));){
            while (it.hasNext()) {
                KeyValue keyValue = (KeyValue)it.next();
                if (this.optional) {
                    String parentKey = CoreOperators.ungroupKey((String)((String)keyValue.key));
                    this.forwardMessage(parentKey, (ReplicationMessage)keyValue.value, message.timestamp());
                    continue;
                }
                ReplicationMessage joined = this.joinFunction.apply(message, (ReplicationMessage)keyValue.value);
                String parentKey = CoreOperators.ungroupKey((String)((String)keyValue.key));
                this.forwardMessage(parentKey, joined.withOperation(ReplicationMessage.Operation.DELETE), message.timestamp());
                deleted.add((String)keyValue.key);
            }
        }
        for (String deletedKey : deleted) {
            this.forwardLookupStore.delete((Object)deletedKey);
        }
    }

    private void forwardJoin(String key, ReplicationMessage message, long timestamp) {
        String actualKey = CoreOperators.ungroupKey((String)key);
        String reverseLookupKey = CoreOperators.ungroupKeyReverse((String)key);
        if (message == null) {
            this.context.forward(new Record((Object)actualKey, null, timestamp));
            return;
        }
        ReplicationMessage withOperation = message.withOperation(message.operation());
        ReplicationMessage outerMessage = (ReplicationMessage)this.reverseLookupStore.get((Object)reverseLookupKey);
        if (outerMessage == null) {
            if (this.optional) {
                this.forwardMessage(actualKey, message, timestamp);
            } else {
                this.forwardMessage(actualKey, message.withOperation(ReplicationMessage.Operation.DELETE), timestamp);
            }
        } else {
            ReplicationMessage joined = this.joinFunction.apply(withOperation, outerMessage);
            this.forwardMessage(actualKey, joined, timestamp);
        }
    }

    private void forwardMessage(String key, ReplicationMessage innerMessage, long timestamp) {
        this.context.forward(new Record((Object)key, (Object)innerMessage, timestamp));
        if (innerMessage.operation() == ReplicationMessage.Operation.DELETE) {
            logger.debug("Delete forwarded, appending null forward with key: {}", (Object)key);
            this.context.forward(new Record((Object)key, null, timestamp));
        }
    }
}

