/*
 * Decompiled with CFR 0.152.
 */
package io.floodplain.streams.remotejoin;

import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.reactive.source.topology.api.TopologyPipeComponent;
import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.streams.api.Topic;
import io.floodplain.streams.api.TopologyContext;
import io.floodplain.streams.remotejoin.CacheProcessor;
import io.floodplain.streams.remotejoin.DiffProcessor;
import io.floodplain.streams.remotejoin.IdentityProcessor;
import io.floodplain.streams.remotejoin.IfElseProcessor;
import io.floodplain.streams.remotejoin.OneToOneProcessor;
import io.floodplain.streams.remotejoin.PreJoinProcessor;
import io.floodplain.streams.remotejoin.PrimaryToSecondaryProcessor;
import io.floodplain.streams.remotejoin.ReduceReadProcessor;
import io.floodplain.streams.remotejoin.StoreProcessor;
import io.floodplain.streams.remotejoin.StoreStateProcessor;
import io.floodplain.streams.remotejoin.TopologyConstructor;
import io.floodplain.streams.remotejoin.ranged.GroupedUpdateProcessor;
import io.floodplain.streams.remotejoin.ranged.ManyToManyGroupedProcessor;
import io.floodplain.streams.remotejoin.ranged.ManyToOneGroupedProcessor;
import io.floodplain.streams.remotejoin.ranged.OneToManyGroupedProcessor;
import io.floodplain.streams.serializer.ConnectReplicationMessageSerde;
import io.floodplain.streams.serializer.ImmutableMessageSerde;
import io.floodplain.streams.serializer.ReplicationMessageSerde;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Stack;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicationTopologyParser {
    public static final String STORE_PREFIX = "STORE_";
    private static final Serde<ReplicationMessage> messageSerde = new ReplicationMessageSerde();
    private static final Serde<ImmutableMessage> immutableMessageSerde = new ImmutableMessageSerde();
    private static final ReplicationMessageSerde replicationMessageSerde = new ReplicationMessageSerde();
    private static final ConnectReplicationMessageSerde connectReplicationMessageSerde = new ConnectReplicationMessageSerde();
    private static final Logger logger = LoggerFactory.getLogger(ReplicationTopologyParser.class);

    private ReplicationTopologyParser() {
    }

    public static void addStateStoreMapping(Map<String, List<String>> processorStateStoreMapper, String processor, String stateStore) {
        logger.info("Adding processor: {} with statestore: {}", (Object)processor, (Object)stateStore);
        List parts = processorStateStoreMapper.computeIfAbsent(stateStore, k -> new ArrayList());
        parts.add(processor);
    }

    public static void materializeStateStores(TopologyConstructor topologyConstructor, Topology current) {
        for (Map.Entry<String, List<String>> element : topologyConstructor.processorStateStoreMapper.entrySet()) {
            String key = element.getKey();
            StoreBuilder<KeyValueStore<String, ReplicationMessage>> supplier = topologyConstructor.stateStoreSupplier.get(key);
            if (supplier == null) {
                StoreBuilder<KeyValueStore<String, ImmutableMessage>> immutableSupplier = topologyConstructor.immutableStoreSupplier.get(key);
                if (immutableSupplier != null) {
                    current = current.addStateStore(immutableSupplier, element.getValue().toArray(new String[0]));
                    logger.info("Added processor: {} with sttstatestores: {} mappings: {}", new Object[]{element.getKey(), element.getValue(), topologyConstructor.processorStateStoreMapper.get(element.getKey())});
                    continue;
                }
                StoreBuilder<KeyValueStore<String, Long>> longSupplier = topologyConstructor.longStoreSupplier.get(key);
                if (longSupplier != null) {
                    current = current.addStateStore(longSupplier, element.getValue().toArray(new String[0]));
                    logger.info("Added processor: {} with sttstatestores: {} mappings: {}", new Object[]{element.getKey(), element.getValue(), topologyConstructor.processorStateStoreMapper.get(element.getKey())});
                    continue;
                }
                logger.error("Missing supplier for: {}\nStore mappings: {} available suppliers: {}", new Object[]{element.getKey(), topologyConstructor.processorStateStoreMapper, topologyConstructor.immutableStoreSupplier});
                logger.error("Available state stores: {}\nimm: {}", topologyConstructor.stateStoreSupplier.keySet(), topologyConstructor.immutableStoreSupplier.keySet());
                throw new RuntimeException("Missing supplier for: " + element.getKey());
            }
            current = current.addStateStore(supplier, element.getValue().toArray(new String[0]));
            logger.info("Added processor: {} with sttstatestores: {} mappings: {}", new Object[]{element.getKey(), element.getValue(), topologyConstructor.processorStateStoreMapper.get(element.getKey())});
        }
    }

    public static void addDiffProcessor(Topology current, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String fromProcessor, String diffProcessorNamePrefix) {
        current.addProcessor(diffProcessorNamePrefix, () -> new DiffProcessor(diffProcessorNamePrefix), new String[]{fromProcessor});
        ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, diffProcessorNamePrefix, diffProcessorNamePrefix);
        logger.info("Granting access for processor: {} to store: {}", (Object)diffProcessorNamePrefix, (Object)diffProcessorNamePrefix);
        topologyConstructor.stateStoreSupplier.put(diffProcessorNamePrefix, ReplicationTopologyParser.createMessageStoreSupplier(diffProcessorNamePrefix, true));
    }

    public static String addMaterializeStore(Topology currentBuilder, TopologyContext context, TopologyConstructor topologyConstructor, String name, String parentProcessor) {
        String sourceProcessorName = name;
        currentBuilder.addProcessor(name, () -> new StoreProcessor(STORE_PREFIX + sourceProcessorName), new String[]{parentProcessor});
        ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, sourceProcessorName, STORE_PREFIX + sourceProcessorName);
        topologyConstructor.stores.add(STORE_PREFIX + sourceProcessorName);
        topologyConstructor.stateStoreSupplier.put(STORE_PREFIX + sourceProcessorName, ReplicationTopologyParser.createMessageStoreSupplier(STORE_PREFIX + sourceProcessorName, true));
        return name;
    }

    public static Deserializer<String> keyDeserializer(Topic.FloodplainKeyFormat keyFormat) {
        switch (keyFormat) {
            case CONNECT_KEY_JSON: {
                return ConnectReplicationMessageSerde.keyDeserialize();
            }
            case FLOODPLAIN_STRING: {
                return Serdes.String().deserializer();
            }
        }
        throw new IllegalArgumentException("Weird key format: " + keyFormat);
    }

    public static Serializer<String> keySerializer(Topic.FloodplainKeyFormat keyFormat) {
        switch (keyFormat) {
            case CONNECT_KEY_JSON: {
                return ConnectReplicationMessageSerde.keySerialize();
            }
            case FLOODPLAIN_STRING: {
                return Serdes.String().serializer();
            }
        }
        throw new IllegalArgumentException("Weird key format: " + keyFormat);
    }

    public static Deserializer<ReplicationMessage> bodyDeserializer(Topic.FloodplainBodyFormat bodyFormat) {
        switch (bodyFormat) {
            case CONNECT_JSON: {
                return connectReplicationMessageSerde.deserializer();
            }
            case FLOODPLAIN_JSON: {
                return replicationMessageSerde.deserializer();
            }
        }
        throw new IllegalArgumentException("Weird body format: " + bodyFormat);
    }

    public static Serializer<ReplicationMessage> bodySerializer(Topic.FloodplainBodyFormat bodyFormat) {
        switch (bodyFormat) {
            case CONNECT_JSON: {
                return connectReplicationMessageSerde.serializer();
            }
            case FLOODPLAIN_JSON: {
                return replicationMessageSerde.serializer();
            }
        }
        throw new IllegalArgumentException("Weird body format: " + bodyFormat);
    }

    public static String addSourceStore(Topology currentBuilder, TopologyConstructor topologyConstructor, Topic sourceTopicName, Topic.FloodplainKeyFormat keyFormat, Topic.FloodplainBodyFormat bodyFormat, boolean materializeStore) {
        return ReplicationTopologyParser.addSourceStore(currentBuilder, topologyConstructor, sourceTopicName, ReplicationTopologyParser.keyDeserializer(keyFormat), ReplicationTopologyParser.bodyDeserializer(bodyFormat), materializeStore);
    }

    public static String addSourceStore(Topology currentBuilder, TopologyConstructor topologyConstructor, Topic sourceTopicName, Deserializer<String> keyDeserializer, Deserializer<ReplicationMessage> bodyDeserializer, boolean materializeStore) {
        String sourceProcessorName = sourceTopicName.prefixedString("SOURCE");
        if (!topologyConstructor.sources.containsKey(sourceTopicName)) {
            String sourceName = sourceProcessorName + "_src";
            currentBuilder.addSource(sourceName, keyDeserializer, bodyDeserializer, new String[]{sourceTopicName.qualifiedString()});
            topologyConstructor.sources.put(sourceTopicName, sourceName);
            if (materializeStore) {
                currentBuilder.addProcessor(sourceProcessorName, () -> new StoreProcessor(STORE_PREFIX + sourceProcessorName), new String[]{sourceName});
            } else {
                currentBuilder.addProcessor(sourceProcessorName, IdentityProcessor::new, new String[]{sourceName});
            }
        }
        if (materializeStore) {
            ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, sourceProcessorName, STORE_PREFIX + sourceProcessorName);
            topologyConstructor.stores.add(STORE_PREFIX + sourceProcessorName);
            topologyConstructor.stateStoreSupplier.put(STORE_PREFIX + sourceProcessorName, ReplicationTopologyParser.createMessageStoreSupplier(STORE_PREFIX + sourceProcessorName, true));
        }
        logger.info("Granting access for processor: {} to store: {}", (Object)sourceProcessorName, (Object)sourceProcessorName);
        return sourceProcessorName;
    }

    public static void addSingleJoinGrouped(Topology current, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String fromProcessor, String name, String withProcessor, boolean optional, boolean materialize, boolean isList) {
        String firstNamePre = name + "-forwardpre";
        String secondNamePre = name + "-reversepre";
        String finalJoin = name + "-joined";
        ProcessorSupplier groupProcessors = !isList ? () -> new ManyToOneGroupedProcessor(fromProcessor, withProcessor, optional) : () -> new ManyToManyGroupedProcessor(fromProcessor, withProcessor, optional);
        current.addProcessor(firstNamePre, () -> new PreJoinProcessor(false), new String[]{fromProcessor}).addProcessor(secondNamePre, () -> new PreJoinProcessor(true), new String[]{withProcessor}).addProcessor(finalJoin, groupProcessors, new String[]{firstNamePre, secondNamePre});
        ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, finalJoin, STORE_PREFIX + withProcessor);
        ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, finalJoin, STORE_PREFIX + fromProcessor);
        ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, name, STORE_PREFIX + name);
        topologyConstructor.stores.add(STORE_PREFIX + withProcessor);
        topologyConstructor.stores.add(STORE_PREFIX + fromProcessor);
        topologyConstructor.stores.add(STORE_PREFIX + name);
        topologyConstructor.stateStoreSupplier.put(STORE_PREFIX + name, ReplicationTopologyParser.createMessageStoreSupplier(STORE_PREFIX + name, true));
        current.addProcessor(name, () -> new StoreProcessor(STORE_PREFIX + name), new String[]{finalJoin});
    }

    public static String addGroupedProcessor(Topology current, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String name, String from, Function<ReplicationMessage, String> keyExtractor) {
        if (!topologyConstructor.stores.contains(STORE_PREFIX + from)) {
            logger.error("Adding grouped with from, no source processor present for: " + from + " created: " + topologyConstructor.stateStoreSupplier.keySet() + " and from: " + from);
        }
        String mappingStoreName = from + "_mapping";
        ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, name, STORE_PREFIX + name);
        topologyConstructor.stores.add(STORE_PREFIX + name);
        ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, name, STORE_PREFIX + mappingStoreName);
        topologyConstructor.stores.add(STORE_PREFIX + mappingStoreName);
        topologyConstructor.stateStoreSupplier.put(STORE_PREFIX + name, ReplicationTopologyParser.createMessageStoreSupplier(STORE_PREFIX + name, true));
        topologyConstructor.stateStoreSupplier.put(STORE_PREFIX + mappingStoreName, ReplicationTopologyParser.createMessageStoreSupplier(STORE_PREFIX + mappingStoreName, true));
        current.addProcessor(name, () -> new GroupedUpdateProcessor(STORE_PREFIX + name, keyExtractor, STORE_PREFIX + mappingStoreName), new String[]{from});
        return name;
    }

    public static void addPersistentCache(Topology current, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String name, String fromProcessorName, Duration cacheTime, int maxSize, boolean inMemory) {
        current.addProcessor(name, () -> new CacheProcessor(name, cacheTime, maxSize, inMemory), new String[]{fromProcessorName});
        logger.info("Buffer using statestore: {}", (Object)(STORE_PREFIX + name));
        ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, name, STORE_PREFIX + name);
        topologyConstructor.stateStoreSupplier.put(STORE_PREFIX + name, ReplicationTopologyParser.createMessageStoreSupplier(STORE_PREFIX + name, true));
    }

    public static String addReducer(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, Stack<String> transformerNames, int currentPipeId, List<TopologyPipeComponent> onAdd, List<TopologyPipeComponent> onRemove, Function<ImmutableMessage, ImmutableMessage> initialMessage, boolean materialize, Optional<BiFunction<ImmutableMessage, ImmutableMessage, String>> keyExtractor) {
        String parentName = transformerNames.peek();
        String reduceReader = topologyContext.qualifiedName("reduce", transformerNames.size(), currentPipeId);
        transformerNames.push(reduceReader);
        String ifElseName = topologyContext.qualifiedName("ifelse", transformerNames.size(), currentPipeId);
        transformerNames.push(ifElseName);
        int trueBranchPipeId = topologyConstructor.generateNewStreamId();
        int falseBranchPipeId = topologyConstructor.generateNewStreamId();
        String trueBranchName = topologyContext.qualifiedName("addbranch", transformerNames.size(), currentPipeId);
        String falseBranchName = topologyContext.qualifiedName("removeBranch", transformerNames.size(), currentPipeId);
        String reduceName = topologyContext.qualifiedName("reduce", transformerNames.size(), currentPipeId);
        String reduceStoreName = "STORE_accumulator_" + reduceName;
        String inputStoreName = "STORE_reduce_inputstore_" + reduceName;
        topology.addProcessor(reduceReader, () -> new ReduceReadProcessor(inputStoreName, reduceStoreName, initialMessage, keyExtractor), new String[]{parentName});
        topology.addProcessor(ifElseName, () -> new IfElseProcessor(msg -> msg.operation() != ReplicationMessage.Operation.DELETE, trueBranchName, Optional.of(falseBranchName)), new String[]{reduceReader});
        Stack<String> addProcessorStack = new Stack<String>();
        addProcessorStack.addAll(transformerNames);
        topology.addProcessor(trueBranchName, IdentityProcessor::new, new String[]{(String)addProcessorStack.peek()});
        addProcessorStack.push(trueBranchName);
        Stack<String> removeProcessorStack = new Stack<String>();
        removeProcessorStack.addAll(transformerNames);
        topology.addProcessor(falseBranchName, IdentityProcessor::new, new String[]{(String)removeProcessorStack.peek()});
        removeProcessorStack.push(falseBranchName);
        for (TopologyPipeComponent topologyPipeComponent : onAdd) {
            topologyPipeComponent.addToTopology(addProcessorStack, trueBranchPipeId, topology, topologyContext, topologyConstructor);
        }
        String primToSecondaryAddProcessor = topologyContext.qualifiedName("primToSecondaryAdd", transformerNames.size(), currentPipeId);
        topology.addProcessor(primToSecondaryAddProcessor, PrimaryToSecondaryProcessor::new, new String[]{addProcessorStack.peek()});
        addProcessorStack.push(primToSecondaryAddProcessor);
        for (TopologyPipeComponent removePipeComponents : onRemove) {
            removePipeComponents.addToTopology(removeProcessorStack, falseBranchPipeId, topology, topologyContext, topologyConstructor);
        }
        String string = topologyContext.qualifiedName("primToSecondaryRemove", transformerNames.size(), currentPipeId);
        topology.addProcessor(string, PrimaryToSecondaryProcessor::new, new String[]{removeProcessorStack.peek()});
        removeProcessorStack.push(string);
        topology.addProcessor((String)(materialize ? "_proc" + reduceName : reduceName), () -> new StoreStateProcessor(reduceStoreName), new String[]{addProcessorStack.peek(), removeProcessorStack.peek()});
        ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, (String)(materialize ? "_proc" + reduceName : reduceName), reduceStoreName);
        ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, reduceReader, reduceStoreName);
        ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, reduceReader, inputStoreName);
        if (!topologyConstructor.immutableStoreSupplier.containsKey(reduceStoreName)) {
            topologyConstructor.immutableStoreSupplier.put(reduceStoreName, ReplicationTopologyParser.createImmutableMessageSupplier(reduceStoreName, true));
        }
        if (!topologyConstructor.stateStoreSupplier.containsKey(inputStoreName)) {
            topologyConstructor.stateStoreSupplier.put(inputStoreName, ReplicationTopologyParser.createMessageStoreSupplier(inputStoreName, true));
        }
        if (materialize) {
            ReplicationTopologyParser.addMaterializeStore(topology, topologyContext, topologyConstructor, reduceName, "_proc" + reduceName);
        }
        return reduceName;
    }

    public static void addJoin(Topology current, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String fromProcessorName, String withProcessorName, String name, boolean optional, boolean multiple, boolean materialize, boolean debug) {
        String firstNamePre = name + "-forwardpre";
        String secondNamePre = name + "-reversepre";
        current.addProcessor(firstNamePre, () -> new PreJoinProcessor(false), new String[]{fromProcessorName}).addProcessor(secondNamePre, () -> new PreJoinProcessor(true), new String[]{withProcessorName});
        ProcessorSupplier proc = multiple ? () -> new OneToManyGroupedProcessor(STORE_PREFIX + fromProcessorName, STORE_PREFIX + withProcessorName, optional, debug) : () -> new OneToOneProcessor(STORE_PREFIX + fromProcessorName, STORE_PREFIX + withProcessorName, optional, (msg, comsg) -> msg.withParamMessage(comsg.message()));
        Object procName = materialize ? "proc_" + name : name;
        current.addProcessor((String)procName, proc, new String[]{firstNamePre, secondNamePre});
        ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, (String)procName, STORE_PREFIX + withProcessorName);
        ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, (String)procName, STORE_PREFIX + fromProcessorName);
        if (materialize) {
            topologyConstructor.stores.add(STORE_PREFIX + name);
            topologyConstructor.stateStoreSupplier.put(STORE_PREFIX + name, ReplicationTopologyParser.createMessageStoreSupplier(STORE_PREFIX + name, true));
            ReplicationTopologyParser.addStateStoreMapping(topologyConstructor.processorStateStoreMapper, name, STORE_PREFIX + name);
            current.addProcessor(name, () -> new StoreProcessor(STORE_PREFIX + name), new String[]{procName});
        }
    }

    public static StoreBuilder<KeyValueStore<String, ReplicationMessage>> createMessageStoreSupplier(String name, boolean persistent) {
        if (!persistent) {
            logger.info("Creating non-persistent messagestore supplier: {}", (Object)name);
        }
        KeyValueBytesStoreSupplier storeSupplier = persistent ? Stores.persistentKeyValueStore((String)name) : Stores.inMemoryKeyValueStore((String)name);
        return Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)storeSupplier, (Serde)Serdes.String(), messageSerde);
    }

    public static StoreBuilder<KeyValueStore<String, ImmutableMessage>> createImmutableMessageSupplier(String name, boolean persistent) {
        if (!persistent) {
            logger.info("Creating non-persistent messagestore supplier: {}", (Object)name);
        }
        KeyValueBytesStoreSupplier storeSupplier = persistent ? Stores.persistentKeyValueStore((String)name) : Stores.inMemoryKeyValueStore((String)name);
        return Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)storeSupplier, (Serde)Serdes.String(), immutableMessageSerde);
    }

    public static StoreBuilder<KeyValueStore<String, Long>> createLongStoreSupplier(String name, boolean persistent) {
        if (!persistent) {
            logger.info("Creating non-persistent messagestore supplier: {}", (Object)name);
        }
        KeyValueBytesStoreSupplier storeSupplier = persistent ? Stores.persistentKeyValueStore((String)name) : Stores.inMemoryKeyValueStore((String)name);
        return Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)storeSupplier, (Serde)Serdes.String(), (Serde)Serdes.Long());
    }

    public static enum Flatten {
        FIRST,
        LAST,
        NONE;

    }
}

