/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.map;

import java.time.LocalTime;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.engine.api.map.KeyValueStore;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.map.MapView;
import net.openhft.chronicle.engine.api.pubsub.ISubscriber;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection;
import net.openhft.chronicle.engine.api.pubsub.SubscriptionConsumer;
import net.openhft.chronicle.engine.api.pubsub.TopicSubscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.cfg.SubscriptionStat;
import net.openhft.chronicle.engine.map.EventConsumer;
import net.openhft.chronicle.engine.map.ObjectSubscription;
import net.openhft.chronicle.engine.map.RawKVSSubscription;
import net.openhft.chronicle.engine.pubsub.MapSimpleSubscription;
import net.openhft.chronicle.engine.pubsub.SimpleSubscription;
import net.openhft.chronicle.engine.query.Filter;
import net.openhft.chronicle.network.api.session.SessionDetails;
import net.openhft.chronicle.network.api.session.SessionProvider;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MapKVSSubscription<K, V>
implements ObjectSubscription<K, V>,
RawKVSSubscription<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(MapKVSSubscription.class);
    private final Set<TopicSubscriber<K, V>> topicSubscribers = new CopyOnWriteArraySet<TopicSubscriber<K, V>>();
    private final Set<Subscriber<MapEvent<K, V>>> subscribers = new CopyOnWriteArraySet<Subscriber<MapEvent<K, V>>>();
    private final Set<Subscriber<K>> keySubscribers = new CopyOnWriteArraySet<Subscriber<K>>();
    private final Set<EventConsumer<K, V>> downstream = new CopyOnWriteArraySet<EventConsumer<K, V>>();
    @Nullable
    private final SessionProvider sessionProvider;
    @Nullable
    private final Asset asset;
    private final Map<Subscriber, Subscriber> subscriptionDelegate = new IdentityHashMap<Subscriber, Subscriber>();
    private KeyValueStore<K, V> kvStore;
    @Nullable
    private Map<String, SubscriptionStat> subscriptionMonitoringMap = null;

    public MapKVSSubscription(@NotNull RequestContext requestContext, @NotNull Asset asset) {
        this(requestContext.viewType(), asset);
    }

    public MapKVSSubscription(@Nullable Class viewType, @Nullable Asset asset) {
        this.asset = asset;
        if (viewType != null && asset != null) {
            asset.addView(viewType, this);
        }
        this.sessionProvider = asset == null ? null : asset.findView(SessionProvider.class);
    }

    public void close() {
        this.notifyEndOfSubscription(this.topicSubscribers);
        this.notifyEndOfSubscription(this.subscribers);
        this.notifyEndOfSubscription(this.keySubscribers);
        this.notifyEndOfSubscription(this.downstream);
    }

    @Override
    public void onEndOfSubscription() {
        throw new UnsupportedOperationException("todo");
    }

    private void notifyEndOfSubscription(@NotNull Set<? extends ISubscriber> subscribers) {
        subscribers.forEach(this::notifyEndOfSubscription);
        subscribers.clear();
    }

    private void notifyEndOfSubscription(@NotNull ISubscriber subscriber) {
        try {
            subscriber.onEndOfSubscription();
        }
        catch (Exception e) {
            Jvm.warn().on(this.getClass(), "Failed to send endOfSubscription", (Throwable)e);
        }
    }

    @Override
    public boolean keyedView() {
        return this.kvStore != null;
    }

    @Override
    public void setKvStore(KeyValueStore<K, V> kvStore) {
        this.kvStore = kvStore;
    }

    @Override
    public void notifyEvent(@NotNull MapEvent<K, V> changeEvent) {
        if (this.hasSubscribers()) {
            this.notifyEvent0(changeEvent);
        }
    }

    @Override
    public int keySubscriberCount() {
        return this.keySubscribers.size();
    }

    @Override
    public int entrySubscriberCount() {
        return this.subscribers.size();
    }

    @Override
    public int topicSubscriberCount() {
        return this.topicSubscribers.size();
    }

    @Override
    public boolean hasSubscribers() {
        return !this.keySubscribers.isEmpty() || this.hasValueSubscribers();
    }

    @Override
    public boolean hasValueSubscribers() {
        return !this.topicSubscribers.isEmpty() || !this.subscribers.isEmpty() || !this.downstream.isEmpty() || this.asset.hasChildren();
    }

    private void notifyEvent0(@NotNull MapEvent<K, V> changeEvent) {
        this.notifyEvent1(changeEvent);
        this.notifyEventToChild(changeEvent);
    }

    private void notifyEvent1(@NotNull MapEvent<K, V> changeEvent) {
        Object key = changeEvent.getKey();
        if (!this.topicSubscribers.isEmpty()) {
            Object value = changeEvent.getValue();
            SubscriptionConsumer.notifyEachSubscriber(this.topicSubscribers, ts -> ts.onMessage(key, value));
        }
        if (!this.subscribers.isEmpty()) {
            SubscriptionConsumer.notifyEachSubscriber(this.subscribers, s -> s.onMessage(changeEvent));
        }
        if (!this.keySubscribers.isEmpty()) {
            SubscriptionConsumer.notifyEachSubscriber(this.keySubscribers, s -> s.onMessage(key));
        }
        if (!this.downstream.isEmpty()) {
            SubscriptionConsumer.notifyEachSubscriber(this.downstream, d -> d.notifyEvent(changeEvent));
        }
    }

    private void notifyEventToChild(@NotNull MapEvent<K, V> changeEvent) {
        Object key = changeEvent.getKey();
        if (this.asset == null || !this.asset.hasChildren()) {
            return;
        }
        String keyStr = key.toString();
        Asset child = this.asset.getChild(keyStr);
        if (child == null) {
            return;
        }
        SubscriptionCollection subscription = child.subscription(false);
        if (subscription instanceof MapSimpleSubscription) {
            ((SimpleSubscription)subscription).notifyMessage(changeEvent.getValue());
        }
    }

    @Override
    public boolean needsPrevious() {
        return !this.subscribers.isEmpty() || !this.downstream.isEmpty();
    }

    @Override
    public void registerSubscriber(@NotNull RequestContext rc, @NotNull Subscriber subscriber, @NotNull Filter filter) {
        Class eClass = rc.elementType();
        if (eClass == KeyValueStore.Entry.class || eClass == MapEvent.class) {
            this.registerSubscriber0(rc, subscriber, filter);
        } else {
            this.registerKeySubscriber(rc, subscriber, filter);
        }
    }

    @NotNull
    private <T> Subscriber<T> subscriber(@NotNull Subscriber<T> subscriber, @NotNull Filter<T> filter) {
        Subscriber<T> sub;
        if (filter.isEmpty()) {
            sub = subscriber;
        } else {
            sub = new Filter.FilteredSubscriber<T>(filter, subscriber);
            this.subscriptionDelegate.put(subscriber, sub);
        }
        return sub;
    }

    private void registerSubscriber0(@NotNull RequestContext rc, @NotNull Subscriber<MapEvent<K, V>> subscriber, @NotNull Filter<MapEvent<K, V>> filter) {
        this.addToStats("subscription");
        Subscriber<MapEvent<K, V>> sub = this.subscriber(subscriber, filter);
        this.subscribers.add(sub);
        Boolean bootstrap = rc.bootstrap();
        if (bootstrap != Boolean.FALSE && this.kvStore != null) {
            try {
                for (int i = 0; i < this.kvStore.segments(); ++i) {
                    this.kvStore.entriesFor(i, sub::onMessage);
                }
                if (Boolean.TRUE.equals(rc.endSubscriptionAfterBootstrap())) {
                    sub.onEndOfSubscription();
                    LOG.info("onEndOfSubscription");
                    this.subscribers.remove(sub);
                }
            }
            catch (InvalidSubscriberException e) {
                this.subscribers.remove(sub);
            }
        }
    }

    @Override
    public void registerKeySubscriber(@NotNull RequestContext rc, @NotNull Subscriber<K> subscriber, @NotNull Filter<K> filter) {
        this.addToStats("keySubscription");
        Boolean bootstrap = rc.bootstrap();
        Subscriber<K> sub = this.subscriber(subscriber, filter);
        this.keySubscribers.add(sub);
        if (bootstrap != Boolean.FALSE && this.kvStore != null) {
            try {
                for (int i = 0; i < this.kvStore.segments(); ++i) {
                    this.kvStore.keysFor(i, sub::onMessage);
                }
                if (Boolean.TRUE.equals(rc.endSubscriptionAfterBootstrap())) {
                    sub.onEndOfSubscription();
                    this.keySubscribers.remove(sub);
                }
            }
            catch (InvalidSubscriberException e) {
                this.keySubscribers.remove(sub);
            }
        }
    }

    @Override
    public void registerTopicSubscriber(@NotNull RequestContext rc, @NotNull TopicSubscriber subscriber) {
        this.addToStats("topicSubscription");
        Boolean bootstrap = rc.bootstrap();
        this.topicSubscribers.add(subscriber);
        if (bootstrap != Boolean.FALSE && this.kvStore != null) {
            try {
                for (int i = 0; i < this.kvStore.segments(); ++i) {
                    this.kvStore.entriesFor(i, e -> subscriber.onMessage(e.getKey(), e.getValue()));
                }
            }
            catch (InvalidSubscriberException dontAdd) {
                this.topicSubscribers.remove(subscriber);
            }
        }
    }

    @Override
    public void registerDownstream(@NotNull EventConsumer<K, V> subscription) {
        this.downstream.add(subscription);
    }

    public void unregisterDownstream(EventConsumer<K, V> subscription) {
        this.downstream.remove(subscription);
    }

    @Override
    public void unregisterSubscriber(@NotNull Subscriber subscriber) {
        Subscriber delegate = this.subscriptionDelegate.remove(subscriber);
        Subscriber s = delegate != null ? delegate : subscriber;
        boolean subscription = this.subscribers.remove(s);
        boolean keySubscription = this.keySubscribers.remove(s);
        if (subscription) {
            this.removeFromStats("subscription");
        }
        if (keySubscription) {
            this.removeFromStats("keySubscription");
        }
        if (s != null) {
            s.onEndOfSubscription();
        }
    }

    @Override
    public void unregisterTopicSubscriber(@NotNull TopicSubscriber subscriber) {
        this.topicSubscribers.remove(subscriber);
        this.removeFromStats("topicSubscription");
        subscriber.onEndOfSubscription();
    }

    @Nullable
    private Map getSubscriptionMap() {
        if (this.subscriptionMonitoringMap != null) {
            return this.subscriptionMonitoringMap;
        }
        Asset subscriptionAsset = this.asset.root().getAsset("proc/subscriptions");
        if (subscriptionAsset != null && subscriptionAsset.getView(MapView.class) != null) {
            this.subscriptionMonitoringMap = subscriptionAsset.getView(MapView.class);
        }
        return this.subscriptionMonitoringMap;
    }

    private void addToStats(String subType) {
        if (this.sessionProvider == null) {
            return;
        }
        SessionDetails sessionDetails = this.sessionProvider.get();
        if (sessionDetails != null) {
            String userId = sessionDetails.userId();
            Map subStats = this.getSubscriptionMap();
            if (subStats != null) {
                SubscriptionStat stat = (SubscriptionStat)subStats.get(userId + "~" + subType);
                if (stat == null) {
                    stat = new SubscriptionStat();
                    stat.setFirstSubscribed(LocalTime.now());
                }
                stat.setTotalSubscriptions(stat.getTotalSubscriptions() + 1);
                stat.setActiveSubscriptions(stat.getActiveSubscriptions() + 1);
                stat.setRecentlySubscribed(LocalTime.now());
                subStats.put(userId + "~" + subType, stat);
            }
        }
    }

    private void removeFromStats(String subType) {
        if (this.sessionProvider == null) {
            return;
        }
        SessionDetails sessionDetails = this.sessionProvider.get();
        if (sessionDetails != null) {
            String userId = sessionDetails.userId();
            Map subStats = this.getSubscriptionMap();
            if (subStats != null) {
                SubscriptionStat stat = (SubscriptionStat)subStats.get(userId + "~" + subType);
                if (stat == null) {
                    throw new AssertionError((Object)"There should be an active subscription");
                }
                stat.setActiveSubscriptions(stat.getActiveSubscriptions() - 1);
                stat.setRecentlySubscribed(LocalTime.now());
                subStats.put(userId + "~" + subType, stat);
            }
        }
    }
}

