/*
 * Decompiled with CFR 0.152.
 */
package io.kgraph.rest.server.graph;

import io.kgraph.GraphAlgorithmState;
import io.kgraph.GraphSerialized;
import io.kgraph.KGraph;
import io.kgraph.library.GraphAlgorithmType;
import io.kgraph.pregel.ComputeFunction;
import io.kgraph.pregel.PregelGraphAlgorithm;
import io.kgraph.rest.server.KafkaGraphsProperties;
import io.kgraph.rest.server.graph.GraphAlgorithmCreateRequest;
import io.kgraph.rest.server.graph.GraphAlgorithmHandler;
import io.kgraph.rest.server.graph.GraphAlgorithmId;
import io.kgraph.rest.server.graph.GraphAlgorithmResultRequest;
import io.kgraph.rest.server.graph.GraphAlgorithmRunRequest;
import io.kgraph.rest.server.graph.GraphAlgorithmStatus;
import io.kgraph.rest.server.graph.GroupEdgesBySourceRequest;
import io.kgraph.rest.server.graph.KeyValue;
import io.kgraph.tools.importer.GraphImporter;
import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.GraphUtils;
import io.kgraph.utils.KryoSerde;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.nodes.GroupMember;
import org.apache.curator.utils.ZKPaths;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.reactive.context.ReactiveWebServerInitializedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.FormFieldPart;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.ResponseStatusException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/*
 * Exception performing whole class analysis ignored.
 */
@Component
@EnableConfigurationProperties(value={KafkaGraphsProperties.class})
public class GraphAlgorithmHandler<EV>
implements ApplicationListener<ReactiveWebServerInitializedEvent> {
    private static final Logger log = LoggerFactory.getLogger(GraphAlgorithmHandler.class);
    private static final String X_KGRAPH_APPID = "X-KGraph-AppId";
    private static final Flux<Long> INTERVAL = Flux.interval((Duration)Duration.ofMillis(100L), (Duration)Duration.ofSeconds(2L));
    private final KafkaGraphsProperties props;
    private final CuratorFramework curator;
    private final String host;
    private int port;
    private GroupMember group;
    private final ConcurrentMap<String, PregelGraphAlgorithm<?, ?, ?, ?>> algorithms = new ConcurrentHashMap();

    public GraphAlgorithmHandler(KafkaGraphsProperties props, CuratorFramework curator) {
        this.props = props;
        this.curator = curator;
        this.host = this.getHostAddress();
    }

    public void onApplicationEvent(ReactiveWebServerInitializedEvent event) {
        this.port = event.getWebServer().getPort();
        this.group = new GroupMember(this.curator, ZKPaths.makePath((String)"/kafka-graphs", (String)"group"), this.getHostAndPort());
        this.group.start();
    }

    public Mono<ServerResponse> importGraph(ServerRequest request) {
        return ((Mono)request.body(BodyExtractors.toMultipartData())).flatMap(parts -> {
            try {
                Map map = parts.toSingleValueMap();
                String verticesTopic = null;
                FormFieldPart verticesTopicPart = (FormFieldPart)map.get("verticesTopic");
                if (verticesTopicPart != null) {
                    verticesTopic = verticesTopicPart.value();
                }
                String edgesTopic = null;
                FormFieldPart edgesTopicPart = (FormFieldPart)map.get("edgesTopic");
                if (edgesTopicPart != null) {
                    edgesTopic = edgesTopicPart.value();
                }
                File vertexFile = null;
                FilePart vertexFilePart = (FilePart)map.get("vertexFile");
                if (vertexFilePart != null) {
                    vertexFile = new File(ClientUtils.tempDirectory(), vertexFilePart.filename());
                    vertexFilePart.transferTo(vertexFile);
                }
                File edgeFile = null;
                FilePart edgeFilePart = (FilePart)map.get("edgeFile");
                if (edgeFilePart != null) {
                    edgeFile = new File(ClientUtils.tempDirectory(), edgeFilePart.filename());
                    edgeFilePart.transferTo(edgeFile);
                }
                String vertexParser = null;
                FormFieldPart vertexParserPart = (FormFieldPart)map.get("vertexParser");
                if (vertexParserPart != null) {
                    vertexParser = vertexParserPart.value();
                }
                String edgeParser = null;
                FormFieldPart edgeParserPart = (FormFieldPart)map.get("edgeParser");
                if (edgeParserPart != null) {
                    edgeParser = edgeParserPart.value();
                }
                String keySerializer = null;
                FormFieldPart keySerializerPart = (FormFieldPart)map.get("keySerializer");
                if (keySerializerPart != null) {
                    keySerializer = keySerializerPart.value();
                }
                String vertexValueSerializer = null;
                FormFieldPart vertexValueSerializerPart = (FormFieldPart)map.get("vertexValueSerializer");
                if (vertexValueSerializerPart != null) {
                    vertexValueSerializer = vertexValueSerializerPart.value();
                }
                String edgeValueSerializer = null;
                FormFieldPart edgeValueSerializerPart = (FormFieldPart)map.get("edgeValueSerializer");
                if (edgeValueSerializerPart != null) {
                    edgeValueSerializer = edgeValueSerializerPart.value();
                }
                int numPartitions = 50;
                FormFieldPart numPartitionsPart = (FormFieldPart)map.get("numPartitions");
                if (numPartitionsPart != null) {
                    numPartitions = Integer.parseInt(numPartitionsPart.value());
                }
                short replicationFactor = 1;
                FormFieldPart replicatorFactorPart = (FormFieldPart)map.get("replicationFactor");
                if (replicatorFactorPart != null) {
                    replicationFactor = Short.parseShort(replicatorFactorPart.value());
                }
                GraphImporter importer = new GraphImporter(this.props.getBootstrapServers(), verticesTopic, edgesTopic, vertexFile, edgeFile, vertexParser, edgeParser, keySerializer, vertexValueSerializer, edgeValueSerializer, numPartitions, replicationFactor);
                importer.call();
            }
            catch (NumberFormatException e) {
                throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Invalid number", (Throwable)e);
            }
            catch (Exception e) {
                throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR);
            }
            return ServerResponse.ok().build();
        });
    }

    public Mono<ServerResponse> prepareGraph(ServerRequest request) {
        String appId = ClientUtils.generateRandomHexString((int)8);
        return request.bodyToMono(GroupEdgesBySourceRequest.class).doOnNext(input -> {
            try {
                KGraph graph;
                GraphAlgorithmType type = input.getAlgorithm();
                StreamsBuilder builder = new StreamsBuilder();
                GraphSerialized serialized = GraphAlgorithmType.graphSerialized((GraphAlgorithmType)type, (boolean)input.isValuesOfTypeDouble());
                Properties streamsConfig = GraphAlgorithmHandler.streamsConfig((String)appId, (String)this.props.getBootstrapServers(), (Serde)serialized.keySerde(), (Serde)serialized.vertexValueSerde());
                KTable edges = builder.table(input.getInitialEdgesTopic(), Consumed.with((Serde)new KryoSerde(), (Serde)serialized.edgeValueSerde()), Materialized.with((Serde)new KryoSerde(), (Serde)serialized.edgeValueSerde()));
                if (input.getInitialVerticesTopic() != null) {
                    KTable vertices = builder.table(input.getInitialVerticesTopic(), Consumed.with((Serde)serialized.keySerde(), (Serde)serialized.vertexValueSerde()), Materialized.with((Serde)new KryoSerde(), (Serde)serialized.edgeValueSerde()));
                    graph = new KGraph(vertices, edges, serialized);
                } else {
                    graph = KGraph.fromEdges((KTable)edges, (ValueMapper)GraphAlgorithmType.initialVertexValueMapper((GraphAlgorithmType)type), (GraphSerialized)serialized);
                }
                CompletableFuture future = GraphUtils.groupEdgesBySourceAndRepartition((StreamsBuilder)builder, (Properties)streamsConfig, (KGraph)graph, (String)input.getVerticesTopic(), (String)input.getEdgesGroupedBySourceTopic(), (int)input.getNumPartitions(), (short)input.getReplicationFactor());
                if (!input.isAsync()) {
                    future.get();
                }
            }
            catch (Exception e) {
                throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR);
            }
        }).then(ServerResponse.ok().build());
    }

    public Mono<ServerResponse> configure(ServerRequest request) {
        List appIdHeaders = request.headers().header("X-KGraph-AppId");
        String appId = appIdHeaders.isEmpty() ? ClientUtils.generateRandomHexString((int)8) : (String)appIdHeaders.iterator().next();
        return request.bodyToMono(GraphAlgorithmCreateRequest.class).doOnNext(input -> {
            PregelGraphAlgorithm algorithm = this.getAlgorithm(appId, input);
            StreamsBuilder builder = new StreamsBuilder();
            Properties streamsConfig = GraphAlgorithmHandler.streamsConfig((String)appId, (String)this.props.getBootstrapServers(), (Serde)algorithm.serialized().keySerde(), (Serde)algorithm.serialized().vertexValueSerde());
            algorithm.configure(builder, streamsConfig);
            this.algorithms.put(appId, algorithm);
        }).flatMapMany(input -> this.proxyConfigure(appIdHeaders.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), appId, input)).then(ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body((Publisher)Mono.just((Object)new GraphAlgorithmId(appId)), GraphAlgorithmId.class));
    }

    private PregelGraphAlgorithm<?, ?, ?, ?> getAlgorithm(String appId, GraphAlgorithmCreateRequest input) {
        try {
            GraphAlgorithmType type = input.getAlgorithm();
            ComputeFunction cf = GraphAlgorithmType.computeFunction((GraphAlgorithmType)type);
            HashMap<String, Object> configs = new HashMap<String, Object>();
            Optional<Object> initMsg = Optional.empty();
            GraphSerialized graphSerialized = GraphAlgorithmType.graphSerialized((GraphAlgorithmType)type, (boolean)input.isValuesOfTypeDouble());
            switch (1.$SwitchMap$io$kgraph$library$GraphAlgorithmType[type.ordinal()]) {
                case 1: {
                    long srcVertexId = Long.parseLong(this.getConfig(input.getConfigs(), "srcVertexId", true));
                    configs.put("srcVertexId", srcVertexId);
                    break;
                }
                case 2: {
                    break;
                }
                case 3: {
                    break;
                }
                case 4: {
                    break;
                }
                case 5: {
                    String[] values = this.getConfig(input.getConfigs(), "landmarkVertexIds", true).split(",");
                    Set landmarkVertexIds = Arrays.stream(values).map(Long::parseLong).collect(Collectors.toSet());
                    configs.put("landmarkVertexIds", landmarkVertexIds);
                    break;
                }
                case 6: {
                    double tolerance = Double.parseDouble(this.getConfig(input.getConfigs(), "tolerance", true));
                    double resetProbability = Double.parseDouble(this.getConfig(input.getConfigs(), "resetProbability", true));
                    String srcVertexIdStr = this.getConfig(input.getConfigs(), "srcVertexId", false);
                    configs.put("tolerance", tolerance);
                    configs.put("resetProbability", resetProbability);
                    if (srcVertexIdStr != null) {
                        configs.put("srcVertexId", Long.parseLong(srcVertexIdStr));
                        initMsg = Optional.of(0.0);
                        break;
                    }
                    initMsg = Optional.of(resetProbability / (1.0 - resetProbability));
                    break;
                }
                case 7: {
                    long srcVertexId2 = Long.parseLong(this.getConfig(input.getConfigs(), "srcVertexId", true));
                    configs.put("srcVertexId", srcVertexId2);
                    break;
                }
                case 8: {
                    String biasLambdaStr = this.getConfig(input.getConfigs(), "lambda.bias", false);
                    float biasLambda = biasLambdaStr != null ? Float.parseFloat(biasLambdaStr) : 0.005f;
                    String biasGammaStr = this.getConfig(input.getConfigs(), "gamma.bias", false);
                    float biasGamma = biasGammaStr != null ? Float.parseFloat(biasGammaStr) : 0.01f;
                    String factorLambdaStr = this.getConfig(input.getConfigs(), "lambda.factor", false);
                    float factorLambda = factorLambdaStr != null ? Float.parseFloat(factorLambdaStr) : 0.005f;
                    String factorGammaStr = this.getConfig(input.getConfigs(), "gamma.factor", false);
                    float factorGamma = factorGammaStr != null ? Float.parseFloat(factorGammaStr) : 0.01f;
                    String minRatingStr = this.getConfig(input.getConfigs(), "min.rating", false);
                    float minRating = minRatingStr != null ? Float.parseFloat(minRatingStr) : 0.0f;
                    String maxRatingStr = this.getConfig(input.getConfigs(), "max.rating", false);
                    float maxRating = maxRatingStr != null ? Float.parseFloat(maxRatingStr) : 5.0f;
                    String vectorSizeStr = this.getConfig(input.getConfigs(), "dim", false);
                    int vectorSize = vectorSizeStr != null ? Integer.parseInt(vectorSizeStr) : 2;
                    String randomSeedStr = this.getConfig(input.getConfigs(), "random.seed", false);
                    Long randomSeed = randomSeedStr != null ? Long.valueOf(Long.parseLong(randomSeedStr)) : null;
                    String iterationsStr = this.getConfig(input.getConfigs(), "iterations", false);
                    int iterations = iterationsStr != null ? Integer.parseInt(iterationsStr) : Integer.MAX_VALUE;
                    configs.put("lambda.bias", Float.valueOf(biasLambda));
                    configs.put("gamma.bias", Float.valueOf(biasGamma));
                    configs.put("lambda.factor", Float.valueOf(factorLambda));
                    configs.put("gamma.factor", Float.valueOf(factorGamma));
                    configs.put("min.rating", Float.valueOf(minRating));
                    configs.put("max.rating", Float.valueOf(maxRating));
                    configs.put("dim", vectorSize);
                    configs.put("random.seed", randomSeed);
                    configs.put("iterations", iterations);
                    break;
                }
                default: {
                    throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Invalid algorithm: " + type);
                }
            }
            return new PregelGraphAlgorithm(this.getHostAndPort(), appId, this.props.getBootstrapServers(), this.curator, input.getVerticesTopic(), input.getEdgesGroupedBySourceTopic(), Collections.emptyMap(), graphSerialized, input.getNumPartitions(), input.getReplicationFactor(), configs, initMsg, cf);
        }
        catch (NumberFormatException e) {
            throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Invalid number", (Throwable)e);
        }
    }

    private String getConfig(Map<String, String> configs, String key, boolean isRequired) {
        String value;
        String string = value = configs != null ? configs.get(key) : null;
        if (isRequired && value == null) {
            throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Missing param: " + key);
        }
        return value;
    }

    private Flux<GraphAlgorithmId> proxyConfigure(Set<String> groupMembers, String appId, GraphAlgorithmCreateRequest input) {
        Flux flux = Flux.fromIterable(groupMembers).filter(s -> !s.equals(this.getHostAndPort())).flatMap(s -> {
            log.debug("proxy configure to {}", s);
            WebClient client = WebClient.create((String)("http://" + s));
            return ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)client.post().uri("/pregel", new Object[0])).accept(new MediaType[]{MediaType.APPLICATION_JSON})).header("X-KGraph-AppId", new String[]{appId})).body((Publisher)Mono.just((Object)input), GraphAlgorithmCreateRequest.class).retrieve().bodyToMono(GraphAlgorithmId.class);
        });
        return flux;
    }

    public Mono<ServerResponse> state(ServerRequest request) {
        String appId = request.pathVariable("id");
        PregelGraphAlgorithm algorithm = (PregelGraphAlgorithm)this.algorithms.get(appId);
        if (algorithm == null) {
            return ServerResponse.notFound().build();
        }
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body((Publisher)Mono.just((Object)new GraphAlgorithmStatus(algorithm.state())), GraphAlgorithmStatus.class);
    }

    public Mono<ServerResponse> run(ServerRequest request) {
        List appIdHeaders = request.headers().header("X-KGraph-AppId");
        String appId = request.pathVariable("id");
        return request.bodyToMono(GraphAlgorithmRunRequest.class).flatMapMany(input -> {
            log.debug("num iterations: {}", (Object)input.getNumIterations());
            PregelGraphAlgorithm algorithm = (PregelGraphAlgorithm)this.algorithms.get(appId);
            GraphAlgorithmState state = algorithm.run(input.getNumIterations());
            GraphAlgorithmStatus status = new GraphAlgorithmStatus(state);
            Flux states = this.proxyRun(appIdHeaders.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), appId, input);
            return Mono.just((Object)status).mergeWith((Publisher)states);
        }).onErrorMap(RuntimeException.class, e -> new ResponseStatusException(HttpStatus.NOT_FOUND)).reduce((state1, state2) -> state1).flatMap(state -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body((Publisher)Mono.just((Object)state), GraphAlgorithmStatus.class));
    }

    private Flux<GraphAlgorithmStatus> proxyRun(Set<String> groupMembers, String appId, GraphAlgorithmRunRequest input) {
        Flux flux = Flux.fromIterable(groupMembers).filter(s -> !s.equals(this.getHostAndPort())).flatMap(s -> {
            log.debug("proxy run to {}", s);
            WebClient client = WebClient.create((String)("http://" + s));
            return ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)client.post().uri("/pregel/{id}", new Object[]{appId})).accept(new MediaType[]{MediaType.APPLICATION_JSON})).header("X-KGraph-AppId", new String[]{appId})).body((Publisher)Mono.just((Object)input), GraphAlgorithmRunRequest.class).retrieve().bodyToMono(GraphAlgorithmStatus.class);
        });
        return flux;
    }

    public Mono<ServerResponse> configs(ServerRequest request) {
        String appId = request.pathVariable("id");
        PregelGraphAlgorithm algorithm = (PregelGraphAlgorithm)this.algorithms.get(appId);
        if (algorithm == null) {
            return ServerResponse.notFound().build();
        }
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body((Publisher)Mono.just((Object)algorithm.configs()), Map.class);
    }

    public Mono<ServerResponse> result(ServerRequest request) {
        List appIdHeaders = request.headers().header("X-KGraph-AppId");
        String appId = request.pathVariable("id");
        PregelGraphAlgorithm algorithm = (PregelGraphAlgorithm)this.algorithms.get(appId);
        if (algorithm == null) {
            return ServerResponse.notFound().build();
        }
        Flux body = Flux.fromIterable((Iterable)algorithm.result()).map(kv -> {
            log.trace("result: ({}, {})", kv.key, kv.value);
            return new KeyValue(kv.key.toString(), kv.value.toString());
        });
        body = this.proxyResult(appIdHeaders.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), appId, body);
        return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(BodyInserters.fromPublisher((Publisher)body, KeyValue.class));
    }

    private Flux<KeyValue> proxyResult(Set<String> groupMembers, String appId, Flux<KeyValue> body) {
        Flux flux = groupMembers.stream().filter(s -> !s.equals(this.getHostAndPort())).map(s -> {
            log.debug("proxy result to {}", s);
            WebClient client = WebClient.create((String)("http://" + s));
            return client.get().uri("/pregel/{id}/result", new Object[]{appId}).accept(new MediaType[]{MediaType.TEXT_EVENT_STREAM}).header("X-KGraph-AppId", new String[]{appId}).retrieve().bodyToFlux(KeyValue.class);
        }).reduce(body, Flux::mergeWith);
        return flux;
    }

    public Mono<ServerResponse> filterResult(ServerRequest request) {
        List appIdHeaders = request.headers().header("X-KGraph-AppId");
        String appId = request.pathVariable("id");
        PregelGraphAlgorithm algorithm = (PregelGraphAlgorithm)this.algorithms.get(appId);
        if (algorithm == null) {
            return ServerResponse.notFound().build();
        }
        Flux filteredBody = request.bodyToMono(GraphAlgorithmResultRequest.class).flatMapMany(input -> {
            Flux body = Flux.fromIterable((Iterable)algorithm.result()).filter(kv -> kv.key.toString().equals(input.getKey())).map(kv -> {
                log.trace("result: ({}, {})", kv.key, kv.value);
                return new KeyValue(kv.key.toString(), kv.value.toString());
            });
            return this.proxyFilterResult(appIdHeaders.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), appId, input, body);
        });
        return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(BodyInserters.fromPublisher((Publisher)filteredBody, KeyValue.class));
    }

    private Flux<KeyValue> proxyFilterResult(Set<String> groupMembers, String appId, GraphAlgorithmResultRequest input, Flux<KeyValue> body) {
        Flux flux = groupMembers.stream().filter(s -> !s.equals(this.getHostAndPort())).map(s -> {
            log.debug("proxy result to {}", s);
            WebClient client = WebClient.create((String)("http://" + s));
            return ((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)((WebClient.RequestBodySpec)client.post().uri("/pregel/{id}/result", new Object[]{appId})).accept(new MediaType[]{MediaType.TEXT_EVENT_STREAM})).header("X-KGraph-AppId", new String[]{appId})).body((Publisher)Mono.just((Object)input), GraphAlgorithmResultRequest.class).retrieve().bodyToFlux(KeyValue.class);
        }).reduce(body, Flux::mergeWith);
        return flux;
    }

    public Mono<ServerResponse> delete(ServerRequest request) {
        List appIdHeaders = request.headers().header("X-KGraph-AppId");
        String appId = request.pathVariable("id");
        PregelGraphAlgorithm algorithm = (PregelGraphAlgorithm)this.algorithms.remove(appId);
        algorithm.close();
        return this.proxyDelete(appIdHeaders.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), appId).then(ServerResponse.noContent().build());
    }

    private Flux<Void> proxyDelete(Set<String> groupMembers, String appId) {
        Flux flux = Flux.fromIterable(groupMembers).filter(s -> !s.equals(this.getHostAndPort())).flatMap(s -> {
            log.debug("proxy delete to {}", s);
            WebClient client = WebClient.create((String)("http://" + s));
            return client.delete().uri("/pregel/{id}", new Object[]{appId}).accept(new MediaType[]{MediaType.APPLICATION_JSON}).header("X-KGraph-AppId", new String[]{appId}).retrieve().bodyToMono(Void.class);
        });
        return flux;
    }

    public static Properties streamsConfig(String appId, String bootstrapServers, Serde<?> keySerde, Serde<?> valueSerde) {
        Properties streamsConfig = new Properties();
        streamsConfig.put("application.id", appId);
        streamsConfig.put("client.id", appId + "-client");
        streamsConfig.put("bootstrap.servers", bootstrapServers);
        streamsConfig.put("default.key.serde", keySerde.getClass().getName());
        streamsConfig.put("default.value.serde", valueSerde.getClass().getName());
        streamsConfig.put("cache.max.bytes.buffering", (Object)0);
        streamsConfig.put("num.stream.threads", (Object)2);
        streamsConfig.put("state.dir", ClientUtils.tempDirectory().getAbsolutePath());
        return streamsConfig;
    }

    public String getHostAndPort() {
        return this.host + ":" + this.port;
    }

    public String getHostAddress() {
        try {
            return InetAddress.getLocalHost().getHostAddress();
        }
        catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

    public int getPort() {
        return this.port;
    }
}

