/*
 * Decompiled with CFR 0.152.
 */
package io.wizzie.enricher.builder;

import io.wizzie.bootstrapper.builder.Config;
import io.wizzie.enricher.enrichment.join.BaseJoiner;
import io.wizzie.enricher.enrichment.join.Joiner;
import io.wizzie.enricher.enrichment.join.QueryableBackJoiner;
import io.wizzie.enricher.enrichment.join.QueryableJoiner;
import io.wizzie.enricher.enrichment.simple.BaseEnrich;
import io.wizzie.enricher.enrichment.simple.Enrich;
import io.wizzie.enricher.exceptions.EnricherNotFound;
import io.wizzie.enricher.exceptions.JoinerNotFound;
import io.wizzie.enricher.model.PlanModel;
import io.wizzie.enricher.model.exceptions.PlanBuilderException;
import io.wizzie.enricher.query.antlr4.Join;
import io.wizzie.enricher.query.antlr4.Query;
import io.wizzie.enricher.query.antlr4.Select;
import io.wizzie.enricher.query.antlr4.Stream;
import io.wizzie.metrics.MetricsManager;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamBuilder {
    String appId;
    MetricsManager metricsManager;
    Config config;
    Map<String, KStream<String, Map<String, Object>>> streams;
    Map<String, KTable<String, Map<String, Object>>> tables;
    Map<String, Joiner> joiners = new HashMap<String, Joiner>();
    Map<String, Enrich> enrichers = new HashMap<String, Enrich>();
    List<String> globalTopics = new LinkedList<String>();
    private static final Logger log = LoggerFactory.getLogger(StreamBuilder.class);

    public StreamBuilder(Config config, MetricsManager metricsManager) {
        this.appId = (String)config.get("application.id");
        this.config = config;
        this.metricsManager = metricsManager;
        this.streams = new HashMap<String, KStream<String, Map<String, Object>>>();
        this.tables = new HashMap<String, KTable<String, Map<String, Object>>>();
        this.globalTopics = config.getOrDefault("global.topics", new LinkedList());
    }

    public StreamsBuilder builder(PlanModel model) throws PlanBuilderException {
        model.validate(this.config.clone());
        this.clean();
        StreamsBuilder builder = new StreamsBuilder();
        this.buildInstances(model);
        this.addStreams(model, builder);
        this.addTables(model, builder);
        this.addEnriches(model);
        this.addInserts(model);
        return builder;
    }

    private void buildInstances(PlanModel model) {
        model.getEnrichers().forEach(enrichModel -> {
            try {
                Enrich enrich = (Enrich)this.makeInstance(enrichModel.getClassName());
                enrich.init(enrichModel.getProperties(), this.metricsManager);
                this.enrichers.put(enrichModel.getName(), enrich);
            }
            catch (ClassNotFoundException e) {
                log.error("Couldn't find the class associated with the function {}", (Object)enrichModel.getClassName());
            }
            catch (IllegalAccessException | InstantiationException e) {
                log.error("Couldn't create the instance associated with the function " + enrichModel.getClassName(), e);
            }
        });
        model.getJoiners().forEach(joinerModel -> {
            try {
                Joiner joiner = (Joiner)this.makeInstance(joinerModel.getClassName());
                joiner.init(joinerModel.getName());
                this.joiners.put(joinerModel.getName(), joiner);
            }
            catch (ClassNotFoundException e) {
                log.error("Couldn't find the class associated with the function {}", (Object)joinerModel.getClassName());
            }
            catch (IllegalAccessException | InstantiationException e) {
                log.error("Couldn't create the instance associated with the function " + joinerModel.getClassName(), e);
            }
        });
    }

    private void addStreams(PlanModel model, StreamsBuilder builder) {
        Random random = new Random();
        model.getQueries().entrySet().forEach(entry -> {
            Select selectQuery = ((Query)entry.getValue()).getSelect();
            List<String> topicStreams = selectQuery.getStreams().stream().filter(s -> !s.isTable()).map(Stream::getName).collect(Collectors.toList());
            if (this.config.getOrDefault("multi.id", false).booleanValue()) {
                topicStreams = topicStreams.stream().map(topic -> this.globalTopics.contains(topic) ? topic : String.format("%s_%s", this.appId, topic)).collect(Collectors.toList());
            }
            KStream stream = builder.stream(topicStreams);
            List<String> dimensions = selectQuery.getDimensions();
            if (!dimensions.contains("*")) {
                stream = stream.mapValues(value -> {
                    HashMap filterValue = new HashMap();
                    dimensions.forEach(dim -> {
                        if (value.containsKey(dim)) {
                            filterValue.put(dim, value.get(dim));
                        }
                    });
                    return filterValue;
                });
            }
            if (this.config.getOrDefault("bypass.null.keys", false).booleanValue()) {
                KStream<K, Object>[] splitBranch = stream.branch((k, v) -> k != null, (k, v) -> k == null);
                stream = splitBranch[0];
                String outputStream = ((Query)entry.getValue()).getInsert().getName();
                if (this.config.getOrDefault("multi.id", false).booleanValue()) {
                    outputStream = String.format("%s_%s", this.appId, outputStream);
                }
                splitBranch[1].to(outputStream);
            }
            this.streams.put((String)entry.getKey(), stream);
        });
    }

    private void addTables(PlanModel model, StreamsBuilder builder) {
        model.getQueries().entrySet().forEach(entry -> {
            List<Join> joins = ((Query)entry.getValue()).getJoins();
            joins.forEach(join -> {
                Joiner joiner;
                KTable<Object, Object> table;
                String tableName;
                if (!join.getStream().isTable()) {
                    log.warn("Join beetween stream isn't supported yet! The join is changed to use stream-table join");
                }
                if (this.tables.containsKey(tableName = this.config.getOrDefault("multi.id", false) != false ? (this.globalTopics.contains(join.getStream().getName()) ? join.getStream().getName() : String.format("%s_%s", this.appId, join.getStream().getName())) : join.getStream().getName())) {
                    table = this.tables.get(tableName);
                } else {
                    table = builder.table(tableName);
                    this.tables.put(tableName, table);
                }
                List<String> dimensions = join.getDimensions();
                if (!dimensions.contains("*")) {
                    table = table.mapValues(value -> {
                        HashMap filterValue = new HashMap();
                        dimensions.forEach(dim -> {
                            if (value.containsKey(dim)) {
                                filterValue.put(dim, value.get(dim));
                            }
                        });
                        return filterValue;
                    });
                }
                KStream<Object, Map<String, Object>> stream = this.streams.get(entry.getKey());
                if (!join.getPartitionKey().equals("__KEY")) {
                    stream = stream.selectKey((key, value) -> {
                        String newKey = value.containsKey(join.getPartitionKey()) ? value.get(join.getPartitionKey()).toString() : key;
                        return newKey;
                    });
                }
                if ((joiner = this.joiners.get(join.getJoinerName())) == null) {
                    throw new JoinerNotFound("BaseJoiner " + join.getJoinerName() + " not found!");
                }
                if (joiner instanceof BaseJoiner) {
                    stream = stream.leftJoin(table, (BaseJoiner)joiner);
                } else if (joiner instanceof QueryableJoiner) {
                    KStream<Object, Map> joinStream = stream.leftJoin(table, (QueryableJoiner)joiner);
                    joinStream.branch((key, value) -> (Boolean)value.get("joiner-status") == false)[0].mapValues(value -> {
                        HashMap<String, String> newValue = new HashMap<String, String>((Map<String, String>)value);
                        newValue.remove("message");
                        newValue.put("table", tableName);
                        return newValue;
                    }).to("__enricher_queryable");
                    stream = joinStream.mapValues(message -> (Map)message.get("message"));
                } else if (joiner instanceof QueryableBackJoiner) {
                    KStream joinStream = stream.leftJoin(table, (QueryableBackJoiner)joiner);
                    joinStream.branch((key, value) -> (Boolean)value.get("joiner-status") == false)[0].mapValues(value -> {
                        HashMap<String, String> newValue = new HashMap<String, String>((Map<String, String>)value);
                        newValue.remove("message");
                        newValue.put("table", tableName);
                        return newValue;
                    }).to("__enricher_queryable");
                    List topics = model.getQueries().get(entry.getKey()).getSelect().getStreams().stream().filter(s -> !s.isTable()).map(Stream::getName).collect(Collectors.toList());
                    joinStream.branch((key, value) -> (Boolean)value.get("joiner-status") == false)[0].mapValues(message -> (Map)message.get("message")).to((String)topics.get(0));
                    stream = joinStream.filter((key, value) -> (Boolean)value.get("joiner-status")).mapValues(message -> (Map)message.get("message"));
                }
                this.streams.put((String)entry.getKey(), (KStream<String, Map<String, Object>>)stream);
            });
        });
    }

    private void addEnriches(PlanModel model) {
        model.getQueries().entrySet().forEach(entry -> {
            List<String> enrichWiths = ((Query)entry.getValue()).getEnrichWiths();
            enrichWiths.forEach(enrichName -> {
                KStream<String, Map<String, Object>> stream = this.streams.get(entry.getKey());
                Enrich enrich = this.enrichers.get(enrichName);
                if (enrich == null) {
                    throw new EnricherNotFound("Enricher " + enrichName + " not found!");
                }
                if (enrich instanceof BaseEnrich) {
                    stream = stream.mapValues((BaseEnrich)enrich);
                } else {
                    log.error("WTF!! The enrich {} isn't a enrich!", enrichName);
                }
                this.streams.put((String)entry.getKey(), stream);
            });
        });
    }

    private void addInserts(PlanModel model) {
        model.getQueries().entrySet().forEach(entry -> {
            Stream insert = ((Query)entry.getValue()).getInsert();
            KStream<String, Map<String, Object>> stream = this.streams.get(entry.getKey());
            String outputStream = insert.getName();
            if (this.config.getOrDefault("multi.id", false).booleanValue()) {
                outputStream = String.format("%s_%s", this.appId, outputStream);
            }
            stream.to(outputStream);
        });
    }

    private void clean() {
        this.streams.clear();
        this.tables.clear();
        this.joiners.clear();
        this.enrichers.clear();
    }

    private <T> T makeInstance(String className) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
        Class<?> funcClass = Class.forName(className);
        return (T)funcClass.newInstance();
    }

    public void close() {
        this.clean();
    }
}

