/*
 * Decompiled with CFR 0.152.
 */
package io.floodplain.streams.remotejoin.ranged;

import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.streams.api.CoreOperators;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManyToManyGroupedProcessor
implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private static final Logger logger = LoggerFactory.getLogger(ManyToManyGroupedProcessor.class);
    private final String fromProcessorName;
    private final String withProcessorName;
    private final boolean optional;
    private final BiFunction<ReplicationMessage, List<ReplicationMessage>, ReplicationMessage> manyToManyJoinFunction;
    private KeyValueStore<String, ReplicationMessage> forwardLookupStore;
    private KeyValueStore<String, ReplicationMessage> reverseLookupStore;
    private ProcessorContext<String, ReplicationMessage> context;

    public ManyToManyGroupedProcessor(String fromProcessor, String withProcessor, boolean optional) {
        this.fromProcessorName = fromProcessor;
        this.withProcessorName = withProcessor;
        this.optional = optional;
        this.manyToManyJoinFunction = CoreOperators.getListJoinFunctionToParam((boolean)false);
    }

    public void process(Record<String, ReplicationMessage> record) {
        boolean reverse = false;
        String key = (String)record.key();
        ReplicationMessage message = (ReplicationMessage)record.value();
        if (key.endsWith("_REV_")) {
            reverse = true;
            key = key.substring(0, key.length() - "_REV_".length());
        }
        if (reverse) {
            this.reverseJoin(key, message, record.timestamp());
        } else {
            this.forwardJoin(key, message, record.timestamp());
        }
    }

    public void init(ProcessorContext<String, ReplicationMessage> context) {
        this.context = context;
        this.forwardLookupStore = (KeyValueStore)context.getStateStore("STORE_" + this.fromProcessorName);
        this.reverseLookupStore = (KeyValueStore)context.getStateStore("STORE_" + this.withProcessorName);
    }

    private void reverseJoin(String key, ReplicationMessage message, long timestamp) {
        String actualKey = CoreOperators.ungroupKeyReverse((String)key);
        if (message == null) {
            logger.info("reverseJoin joinGrouped emitting null message with key: {} ", (Object)actualKey);
            this.context.forward(new Record((Object)actualKey, null, timestamp));
            return;
        }
        try (KeyValueIterator it = this.forwardLookupStore.range((Object)(actualKey + "|"), (Object)(actualKey + "}"));){
            while (it.hasNext()) {
                KeyValue keyValue = (KeyValue)it.next();
                this.forwardJoin((String)keyValue.key, (ReplicationMessage)keyValue.value, timestamp);
            }
        }
    }

    private void forwardJoin(String key, ReplicationMessage message, long timestamp) {
        String actualKey = CoreOperators.ungroupKey((String)key);
        String reverseLookupName = CoreOperators.ungroupKeyReverse((String)key);
        if (message == null) {
            logger.info("forwardJoin joinGrouped emitting null message with key: {} ", (Object)actualKey);
            this.context.forward(new Record((Object)actualKey, null, timestamp));
            return;
        }
        if (message.operation() == ReplicationMessage.Operation.DELETE) {
            // empty if block
        }
        ReplicationMessage withOperation = message.withOperation(message.operation());
        ArrayList<ReplicationMessage> messageList = new ArrayList<ReplicationMessage>();
        try (KeyValueIterator it = this.reverseLookupStore.range((Object)(reverseLookupName + "|"), (Object)(reverseLookupName + "}"));){
            while (it.hasNext()) {
                KeyValue keyValue = (KeyValue)it.next();
                messageList.add((ReplicationMessage)keyValue.value);
            }
        }
        ReplicationMessage joined = this.manyToManyJoinFunction.apply(withOperation, messageList);
        if (this.optional || !messageList.isEmpty()) {
            this.forwardMessage(actualKey, joined, timestamp);
        } else {
            this.forwardMessage(actualKey, joined.withOperation(ReplicationMessage.Operation.DELETE), timestamp);
        }
    }

    private void forwardMessage(String key, ReplicationMessage innerMessage, long timestamp) {
        this.context.forward(new Record((Object)key, (Object)innerMessage, timestamp));
        if (innerMessage.operation() == ReplicationMessage.Operation.DELETE) {
            logger.debug("Delete forwarded, appending null forward with key: {}", (Object)key);
            this.context.forward(new Record((Object)key, null, timestamp));
        }
    }
}

