/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.http.resource.sse;

import io.vlingo.actors.Actor;
import io.vlingo.actors.ActorInstantiator;
import io.vlingo.actors.ActorInstantiatorRegistry;
import io.vlingo.actors.Definition;
import io.vlingo.actors.Stoppable;
import io.vlingo.actors.World;
import io.vlingo.common.Cancellable;
import io.vlingo.common.Scheduled;
import io.vlingo.http.Header;
import io.vlingo.http.Method;
import io.vlingo.http.Request;
import io.vlingo.http.Response;
import io.vlingo.http.ResponseHeader;
import io.vlingo.http.resource.ResourceHandler;
import io.vlingo.http.resource.sse.SseClient;
import io.vlingo.http.resource.sse.SseFeed;
import io.vlingo.http.resource.sse.SsePublisher;
import io.vlingo.http.resource.sse.SseSubscriber;
import io.vlingo.wire.channel.RequestResponseContext;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class SseStreamResource
extends ResourceHandler {
    private static final Map<String, SsePublisher> publishers = new ConcurrentHashMap<String, SsePublisher>();
    private final World world;

    public SseStreamResource(World world) {
        this.world = world;
    }

    public void subscribeToStream(String streamName, Class<? extends Actor> feedClass, int feedPayload, int feedInterval, String feedDefaultId) {
        RequestResponseContext<?> clientContext = this.context().clientContext();
        clientContext.whenClosing((Object)this.unsubscribeRequest());
        String correlationId = this.context().request().headerValueOr("X-Correlation-ID", "");
        Header.Headers<ResponseHeader> headers = ResponseHeader.headers(ResponseHeader.correlationId(correlationId));
        SseSubscriber subscriber = new SseSubscriber(streamName, new SseClient(clientContext, headers), correlationId, this.context().request().headerValueOr("Last-Event-ID", ""));
        this.publisherFor(streamName, feedClass, feedPayload, feedInterval, feedDefaultId).subscribe(subscriber);
    }

    public void unsubscribeFromStream(String streamName, String id) {
        SsePublisher publisher = publishers.get(streamName);
        if (publisher != null) {
            publisher.unsubscribe(new SseSubscriber(streamName, new SseClient(this.context().clientContext())));
        }
        this.completes().with((Object)Response.of(Response.Status.Ok));
    }

    private SsePublisher publisherFor(String streamName, Class<? extends Actor> feedClass, int feedPayload, int feedInterval, String feedDefaultId) {
        SsePublisher presentPublisher;
        SsePublisher publisher = publishers.get(streamName);
        if (publisher == null && (presentPublisher = publishers.putIfAbsent(streamName, publisher = (SsePublisher)this.world.actorFor(SsePublisher.class, Definition.has(SsePublisherActor.class, (ActorInstantiator)new SsePublisher.SsePublisherInstantiator(streamName, feedClass, feedPayload, feedInterval, feedDefaultId))))) != null) {
            publisher.stop();
            publisher = presentPublisher;
        }
        return publisher;
    }

    private Request unsubscribeRequest() {
        try {
            String unsubscribePath = this.context().request().uri.getPath() + "/" + this.context().clientContext().id();
            return Request.has(Method.DELETE).and(new URI(unsubscribePath));
        }
        catch (Exception e) {
            return null;
        }
    }

    public static class SsePublisherActor
    extends Actor
    implements SsePublisher,
    Scheduled<Object>,
    Stoppable {
        private final Cancellable cancellable;
        private final SseFeed feed;
        private final String streamName;
        private final Map<String, SseSubscriber> subscribers;

        public SsePublisherActor(String streamName, Class<? extends Actor> feedClass, int feedPayload, int feedInterval, String feedDefaultId) {
            this.streamName = streamName;
            this.subscribers = new HashMap<String, SseSubscriber>();
            ActorInstantiator instantiator = ActorInstantiatorRegistry.instantiatorFor(feedClass);
            if (instantiator == null) {
                throw new IllegalArgumentException("No ActorInstantiator registred for feedClass=" + feedClass.toString());
            }
            instantiator.set("feedClass", feedClass);
            instantiator.set("streamName", (Object)streamName);
            instantiator.set("feedPayload", (Object)feedPayload);
            instantiator.set("feedDefaultId", (Object)feedDefaultId);
            this.feed = (SseFeed)this.stage().actorFor(SseFeed.class, Definition.has(feedClass, (ActorInstantiator)instantiator));
            this.cancellable = this.stage().scheduler().schedule((Scheduled)this.selfAs(Scheduled.class), null, 10L, (long)feedInterval);
            this.logger().info("SsePublisher started for: " + this.streamName);
        }

        @Override
        public void subscribe(SseSubscriber subscriber) {
            this.subscribers.put(subscriber.id(), subscriber);
        }

        @Override
        public void unsubscribe(SseSubscriber subscriber) {
            SseSubscriber actual = this.subscribers.remove(subscriber.id());
            if (actual != null) {
                actual.close();
            }
        }

        public void intervalSignal(Scheduled<Object> scheduled, Object data) {
            this.feed.to(this.subscribers.values());
        }

        public void stop() {
            this.cancellable.cancel();
            this.unsubscribeAll();
            super.stop();
        }

        private void unsubscribeAll() {
            Collection<SseSubscriber> all = this.subscribers.values();
            for (SseSubscriber subscriber : all.toArray(new SseSubscriber[all.size()])) {
                this.unsubscribe(subscriber);
            }
        }
    }
}

