/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.mcp.server.sse.client;

import java.io.EOFException;
import java.net.ConnectException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow;
import org.jboss.logging.Logger;

public abstract class SseClient {
    private static final Logger LOG = Logger.getLogger(SseClient.class);
    protected final URI connectUri;

    public SseClient(URI uri) {
        this.connectUri = uri;
    }

    protected abstract void process(SseEvent var1);

    protected void connectionFailed() {
    }

    public CompletableFuture<HttpResponse<Void>> connect() {
        return this.connect(null, Map.of());
    }

    public CompletableFuture<HttpResponse<Void>> connect(Map<String, String> headers) {
        return this.connect(null, headers);
    }

    public CompletableFuture<HttpResponse<Void>> connect(HttpClient client, Map<String, String> headers) {
        if (client == null) {
            client = HttpClient.newHttpClient();
        }
        HttpRequest.Builder builder = HttpRequest.newBuilder().uri(this.connectUri).version(HttpClient.Version.HTTP_1_1).header("Accept", "text/event-stream").GET();
        headers.forEach(builder::header);
        HttpRequest request = builder.build();
        return client.sendAsync(request, HttpResponse.BodyHandlers.fromLineSubscriber(new SseEventSubscriber())).exceptionally(e -> {
            if (e instanceof CompletionException) {
                e = e.getCause();
            }
            if (e instanceof ConnectException) {
                ConnectException ce = (ConnectException)e;
                LOG.errorf(ce.getCause(), "Connection failed: %s", (Object)this.connectUri);
                this.connectionFailed();
            } else {
                Throwable root = SseClient.getRootCause(e);
                if (!(root instanceof EOFException)) {
                    LOG.error(e);
                }
            }
            return null;
        });
    }

    private static Throwable getRootCause(Throwable exception) {
        ArrayList<Throwable> chain = new ArrayList<Throwable>();
        for (Throwable curr = exception; curr != null && !chain.contains(curr); curr = curr.getCause()) {
            chain.add(curr);
        }
        return chain.isEmpty() ? null : (Throwable)chain.get(chain.size() - 1);
    }

    class SseEventSubscriber
    implements Flow.Subscriber<String> {
        private Flow.Subscription subscription;
        private String event = "message";
        private StringBuilder dataBuffer = new StringBuilder();

        SseEventSubscriber() {
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1L);
        }

        @Override
        public void onNext(String line) {
            LOG.debugf("Received next line:\n%s", (Object)line);
            if (!line.startsWith(":")) {
                if (line.isBlank()) {
                    SseClient.this.process(new SseEvent(this.event, this.dataBuffer.toString()));
                    this.event = "message";
                    this.dataBuffer = new StringBuilder();
                } else if (line.contains(":")) {
                    int colon = line.indexOf(":");
                    String field = line.substring(0, colon).strip();
                    String value = line.substring(colon + 1).strip();
                    this.handleField(field, value);
                } else {
                    this.handleField(line, "");
                }
            }
            this.subscription.request(1L);
        }

        @Override
        public void onError(Throwable t) {
            Throwable root = SseClient.getRootCause(t);
            if (!(root instanceof EOFException)) {
                LOG.errorf(t, "Error in SSE stream", new Object[0]);
            }
        }

        @Override
        public void onComplete() {
            LOG.debug((Object)"SSE stream complete");
        }

        private void handleField(String field, String value) {
            switch (field) {
                case "event": {
                    this.event = value;
                    break;
                }
                case "data": {
                    this.dataBuffer.append(value).append("\n");
                }
            }
        }
    }

    public record SseEvent(String name, String data) {
    }
}

