/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.map.remote;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.engine.api.map.KeyValueStore;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.TopicSubscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.map.EventConsumer;
import net.openhft.chronicle.engine.map.ObjectSubscription;
import net.openhft.chronicle.engine.map.remote.AbstractRemoteSubscription;
import net.openhft.chronicle.engine.query.Filter;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.server.internal.ObjectKVSubscriptionHandler;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.AsyncSubscription;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteKVSSubscription<K, V>
extends AbstractRemoteSubscription<MapEvent<K, V>>
implements ObjectSubscription<K, V>,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(MapWireHandler.class);
    private final Class<K> kClass;
    private final Class<V> vClass;
    private RequestContext rc;

    public RemoteKVSSubscription(@NotNull RequestContext context, @NotNull Asset asset) {
        super(asset.findView(TcpChannelHub.class), 0L, RemoteKVSSubscription.toUri(context));
        this.kClass = context.keyType();
        this.vClass = context.valueType();
        this.rc = context;
    }

    @NotNull
    private static String toUri(@NotNull RequestContext context) {
        StringBuilder sb = Wires.acquireStringBuilder();
        sb.append(context.fullName()).append("?view=subscription");
        if (context.messageType() != String.class) {
            sb.append("&messageType=").append(ClassAliasPool.CLASS_ALIASES.nameFor(context.messageType()));
        }
        if (context.elementType() != String.class) {
            sb.append("&elementType=").append(ClassAliasPool.CLASS_ALIASES.nameFor(context.elementType()));
        }
        return sb.toString();
    }

    @Override
    public void registerTopicSubscriber(final @NotNull RequestContext rc, final @NotNull TopicSubscriber<K, V> subscriber) {
        if (this.hub.outBytesLock().isHeldByCurrentThread()) {
            throw new IllegalStateException("Cannot view map while debugging");
        }
        this.hub.subscribe((AsyncSubscription)new AbstractAsyncSubscription(this.hub, this.csp, "Remove KV Subscription registerTopicSubscriber"){

            public void onSubscribe(@NotNull WireOut wireOut) {
                RemoteKVSSubscription.this.subscribersToTid.put(subscriber, this.tid());
                wireOut.writeEventName((WireKey)ObjectKVSubscriptionHandler.EventId.registerTopicSubscriber).marshallable(m -> {
                    m.write(() -> "keyType").typeLiteral(RemoteKVSSubscription.this.kClass);
                    m.write(() -> "valueType").typeLiteral(RemoteKVSSubscription.this.vClass);
                    if (rc.bootstrap() != null) {
                        m.writeEventName(() -> "bootstrap").bool(rc.bootstrap());
                    }
                });
            }

            public void onConsumer(@NotNull WireIn inWire) {
                inWire.readDocument(null, d -> {
                    StringBuilder sb = Wires.acquireStringBuilder();
                    ValueIn valueIn = d.readEventName(sb);
                    if (CoreFields.reply.contentEquals((CharSequence)sb)) {
                        valueIn.marshallable(m -> {
                            Object topic = m.read(() -> "topic").object(RemoteKVSSubscription.this.kClass);
                            Object message = m.read(() -> "message").object(RemoteKVSSubscription.this.vClass);
                            RemoteKVSSubscription.this.onEvent(topic, message, subscriber);
                        });
                    } else if (ObjectKVSubscriptionHandler.EventId.onEndOfSubscription.contentEquals(sb)) {
                        RemoteKVSSubscription.this.onEndOfSubscription();
                        RemoteKVSSubscription.this.hub.unsubscribe(this.tid());
                    }
                });
            }
        });
    }

    private void onEvent(K topic, @Nullable V message, @NotNull TopicSubscriber<K, V> subscriber) {
        try {
            subscriber.onMessage(topic, message);
        }
        catch (InvalidSubscriberException noLongerValid) {
            this.unregisterTopicSubscriber(subscriber);
        }
    }

    @Override
    public void unregisterTopicSubscriber(@NotNull TopicSubscriber subscriber) {
        Long tid = (Long)this.subscribersToTid.get(subscriber);
        if (tid == null) {
            Jvm.warn().on(this.getClass(), "There is no subscription to unsubscribe, was " + this.subscribersToTid.size() + " other subscriptions.");
            return;
        }
        this.hub.preventSubscribeUponReconnect(tid.longValue());
        if (!this.hub.isOpen()) {
            this.hub.unsubscribe(tid.longValue());
            return;
        }
        this.hub.lock(() -> {
            this.writeMetaDataForKnownTID(tid);
            this.hub.outWire().writeDocument(false, wireOut -> wireOut.writeEventName((WireKey)ObjectKVSubscriptionHandler.EventId.unregisterTopicSubscriber).text(""));
        });
    }

    @Override
    public void registerKeySubscriber(@NotNull RequestContext rc, @NotNull Subscriber<K> subscriber, @NotNull Filter<K> filter) {
        this.registerSubscriber0(rc, subscriber, filter);
    }

    @Override
    public boolean needsPrevious() {
        return true;
    }

    @Override
    public void setKvStore(KeyValueStore<K, V> store) {
    }

    @Override
    public void notifyEvent(MapEvent<K, V> mpe) {
        throw new UnsupportedOperationException("");
    }

    @Override
    public boolean hasSubscribers() {
        throw new UnsupportedOperationException("has subscribers, is only implemented on the server");
    }

    @Override
    public void registerDownstream(@NotNull EventConsumer<K, V> subscription) {
        this.registerSubscriber(this.rc.clone().messageType(this.rc.messageType()).elementType(MapEvent.class), subscription::notifyEvent, Filter.empty());
    }
}

