/*
 * Decompiled with CFR 0.152.
 */
package net.pincette.jes.util;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import net.pincette.function.SideEffect;
import net.pincette.jes.util.JsonSerde;
import net.pincette.jes.util.Kafka;
import net.pincette.json.JsonUtil;
import net.pincette.util.Collections;
import net.pincette.util.Pair;
import net.pincette.util.TimedCache;
import net.pincette.util.Util;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;

public class Streams {
    private static final String ACTION = "action";
    private static final String APPLICATION = "application";
    private static final String IN = "in";
    private static final String OUT = "out";
    private static final String START = "start";
    private static final String STOP = "stop";

    private Streams() {
    }

    private static void closeStreams(List<TopologyEntry> topologies, CountDownLatch latch, TopologyLifeCycle lifeCycle) {
        if (lifeCycle != null) {
            topologies.forEach(t -> lifeCycle.stopped(t.topology, Streams.getApplication(t.properties)));
        }
        topologies.forEach(t -> {
            Streams.closeStreams(t.streams);
            latch.countDown();
        });
    }

    private static void closeStreams(KafkaStreams streams) {
        streams.close();
        streams.cleanUp();
    }

    private static Properties copy(Properties properties) {
        Properties result = new Properties();
        properties.stringPropertyNames().forEach(name -> result.setProperty((String)name, properties.getProperty((String)name)));
        return result;
    }

    public static <K, V, U> KStream<K, V> duplicateFilter(KStream<K, V> stream, BiFunction<K, V, U> criterion, Duration window) {
        TimedCache cache = new TimedCache(window);
        return stream.filterNot((k, v) -> cache.get(criterion.apply(k, v)).isPresent()).map((k, v) -> Optional.of(criterion.apply(k, v)).map(c -> SideEffect.run(() -> cache.put(c, c)).andThenGet(() -> new KeyValue<Object, Object>(k, v))).orElse(null));
    }

    public static Properties fromConfig(Config config, String path) {
        return config.getConfig(path).entrySet().stream().reduce(new Properties(), (p, e) -> {
            p.put(e.getKey(), ((ConfigValue)e.getValue()).unwrapped().toString());
            return p;
        }, (p1, p2) -> p1);
    }

    private static String getApplication(Properties properties) {
        return properties.getProperty("application.id");
    }

    private static Stream<TopologyDescription.Node> getNodes(Topology topology) {
        return Streams.getNodes(topology.describe().subtopologies().stream().flatMap(s -> s.nodes().stream()), new HashSet<TopologyDescription.Node>());
    }

    private static Stream<TopologyDescription.Node> getNodes(Stream<TopologyDescription.Node> nodes, Set<TopologyDescription.Node> seen) {
        return nodes.filter(n -> !seen.contains(n)).map(n -> SideEffect.run(() -> seen.add((TopologyDescription.Node)n)).andThenGet(() -> n)).flatMap(n -> Stream.concat(Stream.of(n), Streams.getNodes(Stream.concat(n.predecessors().stream(), n.successors().stream()), seen)));
    }

    private static boolean internalTopic(String topic) {
        return topic.contains("KSTREAM");
    }

    public static boolean start(Topology topology, Properties properties) {
        return Streams.start(topology, properties, null);
    }

    public static boolean start(Topology topology, Properties properties, TopologyLifeCycle lifeCycle) {
        return Streams.start(Stream.of(Pair.pair(topology, properties)), lifeCycle);
    }

    public static boolean start(Stream<Pair<Topology, Properties>> topologies) {
        return Streams.start(topologies, null);
    }

    public static boolean start(Stream<Pair<Topology, Properties>> topologies, TopologyLifeCycle lifeCycle) {
        List<TopologyEntry> tpls = Streams.topologyEntries(topologies);
        boolean[] error = new boolean[1];
        CountDownLatch latch = new CountDownLatch(tpls.size());
        tpls.forEach(t -> {
            t.streams.setStateListener((newState, oldState) -> {
                if (newState.equals((Object)KafkaStreams.State.ERROR)) {
                    error[0] = true;
                    Streams.closeStreams(tpls, latch, lifeCycle);
                }
            });
            t.streams.start();
            if (lifeCycle != null) {
                lifeCycle.started(t.topology, Streams.getApplication(t.properties));
            }
        });
        Runtime.getRuntime().addShutdownHook(new Thread(() -> Streams.closeStreams(tpls, latch, lifeCycle)));
        Util.tryToDoRethrow(latch::await);
        return !error[0];
    }

    public static Stop start(Topology topology, Properties properties, TopologyLifeCycle lifeCycle, BiConsumer<Stop, String> onError) {
        return Streams.start(topology, properties, lifeCycle, onError, null);
    }

    public static Stop start(Topology topology, Properties properties, TopologyLifeCycle lifeCycle, BiConsumer<Stop, String> onError, Consumer<Throwable> uncaughtExceptions) {
        String application = Streams.getApplication(properties);
        KafkaStreams streams = new KafkaStreams(topology, Streams.streamsConfig(properties));
        Stopper stop = new Stopper(streams, topology, lifeCycle, application);
        streams.setStateListener((newState, oldState) -> {
            if (newState.equals((Object)KafkaStreams.State.ERROR)) {
                onError.accept(stop, application);
            }
        });
        if (uncaughtExceptions != null) {
            streams.setUncaughtExceptionHandler(e -> {
                uncaughtExceptions.accept(e);
                return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
            });
        }
        streams.start();
        if (lifeCycle != null) {
            lifeCycle.started(topology, application);
        }
        return stop;
    }

    public static JsonObject startMessage(String application, Set<String> inputTopics, Set<String> outputTopics) {
        return Streams.startStopMessage(application, inputTopics, outputTopics, START);
    }

    private static JsonObject startStopMessage(String application, Set<String> inputTopics, Set<String> outputTopics, String action) {
        return JsonUtil.createObjectBuilder().add(APPLICATION, application).add(IN, Streams.toJsonArray(inputTopics)).add(OUT, Streams.toJsonArray(outputTopics)).add(ACTION, action).build();
    }

    public static JsonObject stopMessage(String application, Set<String> inputTopics, Set<String> outputTopics) {
        return Streams.startStopMessage(application, inputTopics, outputTopics, STOP);
    }

    public static Properties streamsConfig(Properties kafkaConfig) {
        Properties result = Streams.copy(kafkaConfig);
        if (result.getProperty("default.deserialization.exception.handler") == null) {
            result.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
        }
        if (result.getProperty("default.key.serde") == null) {
            result.put("default.key.serde", Serdes.String().getClass());
        }
        if (result.getProperty("default.value.serde") == null) {
            result.put("default.value.serde", JsonSerde.class);
        }
        if (result.getProperty("processing.guarantee") == null) {
            result.put("processing.guarantee", "at_least_once");
        }
        return result;
    }

    private static JsonArrayBuilder toJsonArray(Set<String> values) {
        return values.stream().reduce(JsonUtil.createArrayBuilder(), JsonArrayBuilder::add, (b1, b2) -> b1);
    }

    public static Pair<Set<String>, Set<String>> topics(Topology topology) {
        return Streams.getNodes(topology).filter(n -> n instanceof TopologyDescription.Source || n instanceof TopologyDescription.Sink).reduce(Pair.pair(new HashSet(), new HashSet()), (p, n) -> {
            if (n instanceof TopologyDescription.Source) {
                ((Set)p.first).addAll(Streams.topics((TopologyDescription.Source)n));
            } else {
                ((Set)p.second).addAll(Streams.topics((TopologyDescription.Sink)n));
            }
            return p;
        }, (p1, p2) -> p1);
    }

    private static Set<String> topics(TopologyDescription.Source source) {
        return source.topicSet().stream().filter(t -> !Streams.internalTopic(t)).collect(Collectors.toSet());
    }

    private static Set<String> topics(TopologyDescription.Sink sink) {
        return Optional.ofNullable(sink.topic()).filter(t -> !Streams.internalTopic(t)).map(xva$0 -> Collections.set(xva$0)).orElseGet(java.util.Collections::emptySet);
    }

    private static List<TopologyEntry> topologyEntries(Stream<Pair<Topology, Properties>> topologies) {
        return topologies.map(t -> new TopologyEntry((Topology)t.first, (Properties)t.second, new KafkaStreams((Topology)t.first, Streams.streamsConfig((Properties)t.second)))).collect(Collectors.toList());
    }

    public static class TopologyLifeCycleEmitter
    implements TopologyLifeCycle {
        private final KafkaProducer<String, JsonObject> producer;
        private final String topic;

        public TopologyLifeCycleEmitter(String topic, KafkaProducer<String, JsonObject> producer) {
            this.topic = topic;
            this.producer = producer;
        }

        private void sendMessage(Topology topology, String application, String action) {
            Pair<Set<String>, Set<String>> topics = Streams.topics(topology);
            Kafka.send(this.producer, new ProducerRecord<String, JsonObject>(this.topic, UUID.randomUUID().toString(), Streams.startStopMessage(application, (Set)topics.first, (Set)topics.second, action))).toCompletableFuture().join();
        }

        @Override
        public void started(Topology topology, String application) {
            this.sendMessage(topology, application, Streams.START);
        }

        @Override
        public void stopped(Topology topology, String application) {
            this.sendMessage(topology, application, Streams.STOP);
        }
    }

    private static class TopologyEntry {
        private final Properties properties;
        private final KafkaStreams streams;
        private final Topology topology;

        private TopologyEntry(Topology topology, Properties properties, KafkaStreams streams) {
            this.topology = topology;
            this.properties = properties;
            this.streams = streams;
        }
    }

    private static class Stopper
    implements Stop {
        private final String application;
        private final TopologyLifeCycle lifeCycle;
        private final KafkaStreams streams;
        private final Topology topology;
        private boolean stopped;

        private Stopper(KafkaStreams streams, Topology topology, TopologyLifeCycle lifeCycle, String application) {
            this.streams = streams;
            this.topology = topology;
            this.lifeCycle = lifeCycle;
            this.application = application;
        }

        @Override
        public void stop() {
            if (!this.stopped) {
                Streams.closeStreams(this.streams);
                if (this.lifeCycle != null) {
                    this.lifeCycle.stopped(this.topology, this.application);
                }
                this.stopped = true;
            }
        }
    }

    public static class NopTopologyLifeCycle
    implements TopologyLifeCycle {
        @Override
        public void started(Topology topology, String application) {
        }

        @Override
        public void stopped(Topology topology, String application) {
        }
    }

    public static interface TopologyLifeCycle {
        default public TopologyLifeCycle andThen(final TopologyLifeCycle next) {
            final TopologyLifeCycle original = this;
            return new TopologyLifeCycle(){

                @Override
                public void started(Topology topology, String application) {
                    original.started(topology, application);
                    next.started(topology, application);
                }

                @Override
                public void stopped(Topology topology, String application) {
                    original.stopped(topology, application);
                    next.stopped(topology, application);
                }
            };
        }

        public void started(Topology var1, String var2);

        public void stopped(Topology var1, String var2);
    }

    @FunctionalInterface
    public static interface Stop {
        public void stop();
    }
}

