/*
 * Decompiled with CFR 0.152.
 */
package net.pincette.jes.util;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.json.JsonObject;
import net.pincette.function.SideEffect;
import net.pincette.jes.util.Util;
import net.pincette.json.JsonUtil;
import net.pincette.util.Collections;
import org.apache.kafka.streams.kstream.KStream;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.RequestBuilder;

public class Fanout {
    private static final AsyncHttpClient client = Dsl.asyncHttpClient();

    private Fanout() {
    }

    public static void connect(KStream<String, JsonObject> replies, String realmId, String realmKey) {
        Fanout.connect(replies, null, realmId, realmKey);
    }

    public static void connect(KStream<String, JsonObject> replies, Set<String> exclude, String realmId, String realmKey) {
        replies.mapValues(v -> net.pincette.util.Util.tryToGetRethrow(() -> Fanout.send(v, exclude, realmId, realmKey).toCompletableFuture().get()).orElse(false));
    }

    public static void connect(KStream<String, JsonObject> replies, String realmId, String realmKey, Logger logger) {
        Fanout.connect(replies, null, realmId, realmKey, logger);
    }

    public static void connect(KStream<String, JsonObject> replies, Set<String> exclude, String realmId, String realmKey, Logger logger) {
        replies.mapValues(v -> net.pincette.util.Util.tryToGet(() -> Fanout.send(v, exclude, realmId, realmKey).toCompletableFuture().get(), e -> SideEffect.run(() -> logger.log(Level.SEVERE, e.getMessage(), (Throwable)e)).andThenGet(() -> false)).orElse(false));
    }

    private static JsonObject createMessage(JsonObject json, Set<String> usernames) {
        String asString = JsonUtil.string(json);
        return JsonUtil.createObjectBuilder().add("items", usernames.stream().reduce(JsonUtil.createArrayBuilder(), (b, u) -> b.add(JsonUtil.createObjectBuilder().add("channel", (String)u).add("formats", JsonUtil.createObjectBuilder().add("http-stream", JsonUtil.createObjectBuilder().add("content", "event: message\ndata:" + asString + "\n\n")))), (b1, b2) -> b1)).build();
    }

    private static String password(String realmId, String realmKey) {
        return net.pincette.util.Util.tryToGetRethrow(() -> Base64.getEncoder().encodeToString((realmId + ":" + realmKey).getBytes(StandardCharsets.UTF_8))).orElse(null);
    }

    public static CompletionStage<Boolean> send(JsonObject json, String realmId, String realmKey) {
        return Fanout.send(json, null, realmId, realmKey);
    }

    public static CompletionStage<Boolean> send(JsonObject json, Set<String> exclude, String realmId, String realmKey) {
        return Fanout.usernames(json, exclude != null ? exclude : new HashSet()).map(usernames -> client.executeRequest(((RequestBuilder)((RequestBuilder)((RequestBuilder)((RequestBuilder)((RequestBuilder)new RequestBuilder().setUrl(Fanout.url(realmId))).setMethod("POST")).addHeader((CharSequence)"Content-Type", "application/json")).addHeader((CharSequence)"Authorization", "Basic " + Fanout.password(realmId, realmKey))).setBody(JsonUtil.string(Fanout.createMessage(json, usernames)))).build()).toCompletableFuture().thenApply(response -> response.getStatusCode() == 200)).orElseGet(() -> CompletableFuture.completedFuture(true));
    }

    private static Set<String> subscribers(JsonObject json) {
        return JsonUtil.getObjects(json, "_subscriptions").map(subscription -> subscription.getString("sub", null)).collect(Collectors.toSet());
    }

    private static String url(String realmId) {
        return "https://api.fanout.io/realm/" + realmId + "/publish/";
    }

    private static Optional<Set<String>> usernames(JsonObject json, Set<String> exclude) {
        return Optional.of(Collections.difference(Collections.union(Util.getUsername(json).map(xva$0 -> Collections.set(xva$0)).orElseGet(HashSet::new), Fanout.subscribers(json)), exclude)).filter(usernames -> !usernames.isEmpty());
    }
}

