/*
 * Decompiled with CFR 0.152.
 */
package io.floodplain.streams.remotejoin;

import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.immutable.factory.ImmutableFactory;
import io.floodplain.replication.api.ReplicationMessage;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReduceReadProcessor
implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private static final Logger logger = LoggerFactory.getLogger(ReduceReadProcessor.class);
    private final String accumulatorStoreName;
    private final String inputStoreName;
    private final Function<ImmutableMessage, ImmutableMessage> initial;
    private final Optional<BiFunction<ImmutableMessage, ImmutableMessage, String>> keyExtractor;
    private KeyValueStore<String, ImmutableMessage> accumulatorStore;
    private KeyValueStore<String, ReplicationMessage> inputStore;
    private ProcessorContext<String, ReplicationMessage> context;

    public ReduceReadProcessor(String inputStoreName, String accumulatorStoreName, Function<ImmutableMessage, ImmutableMessage> initial, Optional<BiFunction<ImmutableMessage, ImmutableMessage, String>> keyExtractor) {
        this.accumulatorStoreName = accumulatorStoreName;
        this.inputStoreName = inputStoreName;
        this.initial = initial;
        this.keyExtractor = keyExtractor;
    }

    public void init(ProcessorContext context) {
        this.context = context;
        this.accumulatorStore = (KeyValueStore)context.getStateStore(this.accumulatorStoreName);
        this.inputStore = (KeyValueStore)context.getStateStore(this.inputStoreName);
    }

    public void process(Record<String, ReplicationMessage> record) {
        String extracted;
        ReplicationMessage inputValue = (ReplicationMessage)record.value();
        String key = (String)record.key();
        ReplicationMessage stored = (ReplicationMessage)this.inputStore.get((Object)key);
        if (stored == null) {
            if (inputValue == null || inputValue.operation() == ReplicationMessage.Operation.DELETE) {
                logger.warn("Issue: Deleting (?) a message that isn't there. Is this bad? Dropping message. key: {} inputvalue: {}", (Object)key, (Object)inputValue);
                return;
            }
            extracted = this.keyExtractor.orElse((m, s) -> "singlerestore").apply(inputValue.message(), inputValue.paramMessage().orElse(ImmutableFactory.empty()));
        } else {
            extracted = this.keyExtractor.orElse((m, s) -> "singlerestore").apply(stored.message(), stored.paramMessage().orElse(ImmutableFactory.empty()));
        }
        ImmutableMessage storedAccumulator = (ImmutableMessage)this.accumulatorStore.get((Object)extracted);
        ReplicationMessage value = inputValue;
        this.inputStore.put((Object)key, (Object)inputValue);
        if (inputValue == null || inputValue.operation() == ReplicationMessage.Operation.DELETE) {
            if (stored == null) {
                throw new RuntimeException("Issue: Deleting a message that isn't there. Is this bad?");
            }
            ImmutableMessage param = storedAccumulator == null ? this.initial.apply(stored.message()) : storedAccumulator;
            value = stored.withOperation(ReplicationMessage.Operation.DELETE).withParamMessage(param);
            this.inputStore.delete((Object)key);
        } else {
            if (storedAccumulator == null) {
                storedAccumulator = this.initial.apply(inputValue.message());
            }
            value = value.withParamMessage(storedAccumulator);
            if (stored != null) {
                this.context.forward(new Record((Object)extracted, (Object)stored.withOperation(ReplicationMessage.Operation.DELETE).withParamMessage(storedAccumulator != null ? storedAccumulator : this.initial.apply(inputValue.message())), record.timestamp()));
                storedAccumulator = (ImmutableMessage)this.accumulatorStore.get((Object)extracted);
            }
            value = value.withParamMessage(storedAccumulator);
        }
        this.context.forward(new Record((Object)extracted, (Object)value, record.timestamp()));
    }
}

