/*
 * Decompiled with CFR 0.152.
 */
package io.floodplain.streams.remotejoin.ranged;

import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.streams.api.CoreOperators;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OneToManyGroupedProcessor
implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private static final Logger logger = LoggerFactory.getLogger(OneToManyGroupedProcessor.class);
    private final boolean debug;
    private final String storeName;
    private final String groupedStoreName;
    private final boolean optional;
    private KeyValueStore<String, ReplicationMessage> groupedLookupStore;
    private KeyValueStore<String, ReplicationMessage> lookupStore;
    private final BiFunction<ReplicationMessage, List<ReplicationMessage>, ReplicationMessage> joinFunction;
    private ProcessorContext<String, ReplicationMessage> context;

    public OneToManyGroupedProcessor(String storeName, String groupedStoreName, boolean optional, boolean debug) {
        this.storeName = storeName;
        this.groupedStoreName = groupedStoreName;
        this.optional = optional;
        this.joinFunction = CoreOperators.getListJoinFunctionToParam((boolean)false);
        this.debug = debug;
    }

    public void init(ProcessorContext<String, ReplicationMessage> context) {
        this.context = context;
        this.lookupStore = (KeyValueStore)context.getStateStore(this.storeName);
        this.groupedLookupStore = (KeyValueStore)context.getStateStore(this.groupedStoreName);
    }

    public void process(Record<String, ReplicationMessage> record) {
        String key = (String)record.key();
        ReplicationMessage msg = (ReplicationMessage)record.value();
        boolean reverse = false;
        if (key.endsWith("_REV_")) {
            reverse = true;
            key = key.substring(0, key.length() - "_REV_".length());
        }
        if (reverse) {
            this.reverseJoin(key, msg, record.timestamp());
        } else {
            if (msg == null) {
                logger.debug("O2M Emitting null message with key: {}", (Object)key);
                this.context.forward(new Record((Object)key, null, record.timestamp()));
                return;
            }
            this.forwardJoin(key, msg, record.timestamp());
        }
    }

    private void forwardJoin(String key, ReplicationMessage msg, long timestamp) {
        ArrayList<ReplicationMessage> msgs = new ArrayList<ReplicationMessage>();
        try (KeyValueIterator it = this.groupedLookupStore.range((Object)(key + "|"), (Object)(key + "}"));){
            while (it.hasNext()) {
                KeyValue keyValue = (KeyValue)it.next();
                msgs.add((ReplicationMessage)keyValue.value);
            }
        }
        ReplicationMessage joined = msg;
        if (msgs.size() > 0 || this.optional) {
            joined = this.joinFunction.apply(msg, msgs);
        }
        if (this.optional || msgs.size() > 0) {
            this.forwardMessage(key, joined, timestamp);
        } else {
            this.forwardMessage(key, joined.withOperation(ReplicationMessage.Operation.DELETE), timestamp);
        }
    }

    private void reverseJoin(String key, ReplicationMessage msg, long timestamp) {
        String actualKey = CoreOperators.ungroupKeyReverse((String)key);
        ReplicationMessage one = (ReplicationMessage)this.lookupStore.get((Object)actualKey);
        if (this.debug) {
            long storeSize = this.lookupStore.approximateNumEntries();
            logger.info("# of elements in reverse store: {}", (Object)storeSize);
        }
        if (one == null) {
            return;
        }
        this.forwardJoin(actualKey, one, timestamp);
    }

    private void forwardMessage(String key, ReplicationMessage innerMessage, long timestamp) {
        this.context.forward(new Record((Object)key, (Object)innerMessage, timestamp));
        if (innerMessage.operation() == ReplicationMessage.Operation.DELETE) {
            logger.debug("Delete forwarded, appending null forward with key: {}", (Object)key);
            this.context.forward(new Record((Object)key, null, timestamp));
        }
    }
}

