/*
 * Decompiled with CFR 0.152.
 */
package io.muserver.rest;

import io.muserver.ClientDisconnectedException;
import io.muserver.MuException;
import io.muserver.Mutils;
import io.muserver.rest.JaxSseEventSinkImpl;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.SseBroadcaster;
import jakarta.ws.rs.sse.SseEventSink;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

class SseBroadcasterImpl
implements SseBroadcaster {
    private volatile boolean isClosed = false;
    private final List<BiConsumer<SseEventSink, Throwable>> errorListeners = new CopyOnWriteArrayList<BiConsumer<SseEventSink, Throwable>>();
    private final List<Consumer<SseEventSink>> closeListeners = new CopyOnWriteArrayList<Consumer<SseEventSink>>();
    private final List<SseEventSink> sinks = new CopyOnWriteArrayList<SseEventSink>();

    SseBroadcasterImpl() {
    }

    public void onError(BiConsumer<SseEventSink, Throwable> onError) {
        Mutils.notNull("onError", onError);
        this.throwIfClosed();
        this.errorListeners.add(onError);
    }

    public void onClose(Consumer<SseEventSink> onClose) {
        Mutils.notNull("onClose", onClose);
        this.throwIfClosed();
        this.closeListeners.add(onClose);
    }

    public void register(SseEventSink sseEventSink) {
        Mutils.notNull("sseEventSink", sseEventSink);
        this.throwIfClosed();
        this.sinks.add(sseEventSink);
        if (sseEventSink instanceof JaxSseEventSinkImpl) {
            ((JaxSseEventSinkImpl)sseEventSink).setResponseCompleteHandler(info -> {
                if (!info.completedSuccessfully()) {
                    Exception ex;
                    switch (info.response().responseState()) {
                        case CLIENT_DISCONNECTED: {
                            ex = new ClientDisconnectedException();
                            break;
                        }
                        case TIMED_OUT: {
                            ex = new TimeoutException();
                            break;
                        }
                        default: {
                            ex = new MuException("Generic error");
                        }
                    }
                    this.onSinkErrored(sseEventSink, ex);
                }
            });
        }
    }

    public CompletionStage<?> broadcast(OutboundSseEvent event) {
        Mutils.notNull("event", event);
        this.throwIfClosed();
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicInteger count = new AtomicInteger(this.sinks.size());
        for (SseEventSink sink : this.sinks) {
            if (sink.isClosed()) {
                this.sinks.remove(sink);
                this.sendOnCloseEvent(sink);
                SseBroadcasterImpl.sendComplete(completableFuture, count);
                continue;
            }
            sink.send(event).whenComplete((o, throwable) -> {
                if (throwable != null) {
                    this.onSinkErrored(sink, (Throwable)throwable);
                }
                SseBroadcasterImpl.sendComplete(completableFuture, count);
            });
        }
        return completableFuture;
    }

    private void onSinkErrored(SseEventSink sink, Throwable throwable) {
        boolean wasInList = this.sinks.remove(sink);
        if (wasInList) {
            try {
                sink.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            for (BiConsumer<SseEventSink, Throwable> errorListener : this.errorListeners) {
                errorListener.accept(sink, throwable);
            }
        }
    }

    private static void sendComplete(CompletableFuture<?> completableFuture, AtomicInteger count) {
        int remaining = count.decrementAndGet();
        if (remaining == 0) {
            completableFuture.complete(null);
        }
    }

    public void close() {
        if (!this.isClosed) {
            for (SseEventSink sink : this.sinks) {
                try {
                    sink.close();
                    this.sendOnCloseEvent(sink);
                }
                catch (Exception exception) {}
            }
            this.sinks.clear();
            this.isClosed = true;
        }
    }

    private void sendOnCloseEvent(SseEventSink sink) {
        for (Consumer<SseEventSink> closeListener : this.closeListeners) {
            closeListener.accept(sink);
        }
    }

    private void throwIfClosed() {
        if (this.isClosed) {
            throw new IllegalStateException("This broadcaster has already been closed");
        }
    }

    public int connectedSinksCount() {
        return this.sinks.size();
    }
}

