/*
 * Decompiled with CFR 0.152.
 */
package com.canoo.dp.impl.server.event;

import com.canoo.dp.impl.platform.core.Assert;
import com.canoo.dp.impl.server.client.ClientSessionLifecycleHandler;
import com.canoo.dp.impl.server.context.DolphinContext;
import com.canoo.dp.impl.server.context.DolphinContextProvider;
import com.canoo.dp.impl.server.event.DolphinEvent;
import com.canoo.dp.impl.server.event.ListenerWithFilter;
import com.canoo.platform.core.functional.Subscription;
import com.canoo.platform.remoting.server.event.ClientSessionEventFilter;
import com.canoo.platform.remoting.server.event.MessageEventContext;
import com.canoo.platform.remoting.server.event.MessageListener;
import com.canoo.platform.remoting.server.event.RemotingEventBus;
import com.canoo.platform.remoting.server.event.Topic;
import com.canoo.platform.server.client.ClientSession;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import javax.servlet.http.HttpSession;
import org.apiguardian.api.API;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@API(since="0.x", status=API.Status.INTERNAL)
public abstract class AbstractEventBus
implements RemotingEventBus {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractEventBus.class);
    private DolphinContextProvider contextProvider;
    private final Map<Topic<?>, List<ListenerWithFilter<?>>> topicToListenerMap = new ConcurrentHashMap();
    private final Map<MessageListener<?>, String> listenerToSessionMap = new ConcurrentHashMap();
    private final Map<String, List<Subscription>> sessionStore = new ConcurrentHashMap<String, List<Subscription>>();
    private final AtomicBoolean initialized = new AtomicBoolean(false);

    public void init(DolphinContextProvider contextProvider, ClientSessionLifecycleHandler lifecycleHandler) {
        this.contextProvider = (DolphinContextProvider)Assert.requireNonNull((Object)contextProvider, (String)"contextProvider");
        ((ClientSessionLifecycleHandler)Assert.requireNonNull((Object)lifecycleHandler, (String)"lifecycleHandler")).addSessionDestroyedListener(s -> this.onSessionEnds(s.getId()));
        this.initialized.set(true);
    }

    @Override
    public <T extends Serializable> void publish(Topic<T> topic, T data) {
        ClientSession clientSession;
        this.checkInitialization();
        DolphinEvent<T> event = new DolphinEvent<T>(topic, System.currentTimeMillis(), data);
        event.addMetadata("sender.type", (Serializable)((Object)"dolphinPlatform"));
        DolphinContext currentContext = this.getCurrentContext();
        if (currentContext != null && (clientSession = currentContext.getClientSession()) != null) {
            event.addMetadata("sender.dolphinPlatform.clientSessionId", (Serializable)((Object)clientSession.getId()));
            HttpSession httpSession = clientSession.getHttpSession();
            if (httpSession != null) {
                event.addMetadata("sender.httpSessionId", (Serializable)((Object)httpSession.getId()));
            }
        }
        if (currentContext != null) {
            List<ListenerWithFilter<T>> listenersInCurrentSession = this.getListenersForSessionAndTopic(currentContext.getId(), topic);
            for (ListenerWithFilter<T> listenerAndFilter : listenersInCurrentSession) {
                Predicate<MessageEventContext<MessageEventContext<T>>> filter = listenerAndFilter.getFilter();
                MessageListener<T> listener = listenerAndFilter.getListener();
                if (filter != null && !filter.test(event.getMessageEventContext())) continue;
                listener.onMessage(event);
            }
        }
        this.publishForOtherSessions(event);
    }

    @Override
    public <T extends Serializable> Subscription subscribe(final Topic<T> topic, final MessageListener<? super T> listener, Predicate<MessageEventContext<T>> filter) {
        this.checkInitialization();
        Assert.requireNonNull(topic, (String)"topic");
        Assert.requireNonNull(listener, (String)"listener");
        DolphinContext subscriptionContext = this.getCurrentContext();
        if (subscriptionContext == null) {
            throw new IllegalStateException("Subscription can only be done from Dolphin Context!");
        }
        final String subscriptionSessionId = subscriptionContext.getId();
        LOG.trace("Adding subscription for topic {} in Dolphin Platform context {}", (Object)topic.getName(), (Object)subscriptionSessionId);
        List<ListenerWithFilter<?>> listeners = this.topicToListenerMap.get(topic);
        if (listeners == null) {
            listeners = new CopyOnWriteArrayList();
            this.topicToListenerMap.put(topic, listeners);
        }
        final ListenerWithFilter<? super T> listenerWithFilter = new ListenerWithFilter<T>(listener, filter);
        listeners.add(listenerWithFilter);
        this.listenerToSessionMap.put(listener, subscriptionSessionId);
        Subscription subscription = new Subscription(){

            public void unsubscribe() {
                LOG.trace("Removing subscription for topic {} in Dolphin Platform context {}", (Object)topic.getName(), (Object)subscriptionSessionId);
                List listeners = (List)AbstractEventBus.this.topicToListenerMap.get(topic);
                if (listeners != null) {
                    listeners.remove(listenerWithFilter);
                }
                AbstractEventBus.this.listenerToSessionMap.remove(listener);
                AbstractEventBus.this.removeSubscriptionForSession(this, subscriptionSessionId);
            }
        };
        this.addSubscriptionForSession(subscription, subscriptionSessionId);
        return subscription;
    }

    @Override
    public <T extends Serializable> Subscription subscribe(Topic<T> topic, MessageListener<? super T> listener) {
        return this.subscribe(topic, listener, null);
    }

    protected <T extends Serializable> void triggerEventHandling(final DolphinEvent<T> event) {
        Assert.requireNonNull(event, (String)"event");
        final Topic<T> topic = event.getMessageEventContext().getTopic();
        LOG.trace("Handling data for topic {}", (Object)topic.getName());
        List<ListenerWithFilter<?>> listeners = this.topicToListenerMap.get(topic);
        if (listeners != null) {
            for (final ListenerWithFilter<?> listenerAndFilter : listeners) {
                final String sessionId = this.listenerToSessionMap.get(listenerAndFilter.getListener());
                if (sessionId == null) {
                    throw new RuntimeException("Internal Error! No session id defined for event bus listener!");
                }
                if (this.sendInSameClientSession(event, listenerAndFilter.getListener())) {
                    LOG.trace("Event listener for topic {} was already called in Dolphin Platform context {}", (Object)topic.getName(), (Object)sessionId);
                    continue;
                }
                LOG.trace("Event listener for topic {} must be called later in Dolphin Platform context {}", (Object)topic.getName(), (Object)sessionId);
                this.contextProvider.getContextById(sessionId).runLater(new Runnable(){

                    @Override
                    public void run() {
                        LOG.trace("Calling event listener for topic {} in Dolphin Platform context {}", (Object)topic.getName(), (Object)sessionId);
                        Predicate sessionFilter = listenerAndFilter.getFilter();
                        MessageListener listener = listenerAndFilter.getListener();
                        try {
                            if (sessionFilter == null || sessionFilter.test(event.getMessageEventContext())) {
                                listener.onMessage(event);
                            }
                        }
                        catch (Exception e) {
                            LOG.error("Error in calling event listener for topic '" + topic.getName() + "' in Dolphin Platform context " + sessionId, (Throwable)e);
                        }
                    }
                });
            }
        }
    }

    private <T extends Serializable> boolean sendInSameClientSession(DolphinEvent<T> event, MessageListener<T> listener) {
        Assert.requireNonNull(event, (String)"event");
        String listenerSessionId = this.listenerToSessionMap.get(listener);
        MessageEventContext<T> eventContext = event.getMessageEventContext();
        if (eventContext != null) {
            return new ClientSessionEventFilter<T>(listenerSessionId).test(eventContext);
        }
        return false;
    }

    protected abstract <T extends Serializable> void publishForOtherSessions(DolphinEvent<T> var1);

    private void checkInitialization() {
        if (!this.initialized.get()) {
            throw new RuntimeException("EventBus not initialized");
        }
    }

    private <T extends Serializable> List<ListenerWithFilter<T>> getListenersForSessionAndTopic(String sessionId, Topic<T> topic) {
        Assert.requireNonBlank((String)sessionId, (String)"sessionId");
        Assert.requireNonNull(topic, (String)"topic");
        List<ListenerWithFilter<?>> handlers = this.topicToListenerMap.get(topic);
        if (handlers == null) {
            return Collections.emptyList();
        }
        ArrayList<ListenerWithFilter<T>> ret = new ArrayList<ListenerWithFilter<T>>();
        for (ListenerWithFilter<?> listener : handlers) {
            if (!sessionId.equals(this.listenerToSessionMap.get(listener.getListener()))) continue;
            ret.add(listener);
        }
        return ret;
    }

    private void addSubscriptionForSession(Subscription subscription, String dolphinSessionId) {
        List<Subscription> subscriptionsForSession = this.sessionStore.get(dolphinSessionId);
        if (subscriptionsForSession == null) {
            subscriptionsForSession = new CopyOnWriteArrayList<Subscription>();
            this.sessionStore.put(dolphinSessionId, subscriptionsForSession);
        }
        subscriptionsForSession.add(subscription);
    }

    private void removeSubscriptionForSession(Subscription subscription, String dolphinSessionId) {
        List<Subscription> subscriptionsForSession = this.sessionStore.get(dolphinSessionId);
        if (subscriptionsForSession != null) {
            subscriptionsForSession.remove(subscription);
        }
    }

    private void onSessionEnds(String dolphinSessionId) {
        Assert.requireNonBlank((String)dolphinSessionId, (String)"dolphinSessionId");
        List<Subscription> subscriptions = this.sessionStore.get(dolphinSessionId);
        if (subscriptions != null) {
            for (Subscription subscription : subscriptions) {
                subscription.unsubscribe();
            }
        }
    }

    private DolphinContext getCurrentContext() {
        return this.contextProvider.getCurrentDolphinContext();
    }
}

