/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.map.remote;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.query.IndexQuery;
import net.openhft.chronicle.engine.api.query.IndexQueueView;
import net.openhft.chronicle.engine.api.query.IndexedValue;
import net.openhft.chronicle.engine.api.query.VanillaIndexQuery;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.server.internal.IndexQueueViewHandler;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.AsyncSubscription;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.Marshallable;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteIndexQueueView<K extends Marshallable, V extends Marshallable>
extends AbstractStatelessClient<MapWireHandler.EventId>
implements IndexQueueView<Subscriber<IndexedValue<V>>, V> {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteIndexQueueView.class);
    private final Map<Object, Long> subscribersToTid = new ConcurrentHashMap<Object, Long>();
    int i;

    public RemoteIndexQueueView(@NotNull RequestContext context, @NotNull Asset asset) {
        super(asset.findView(TcpChannelHub.class), 0L, RemoteIndexQueueView.toUri(context));
    }

    @NotNull
    private static String toUri(@NotNull RequestContext context) {
        return context.viewType(IndexQueueView.class).toUri();
    }

    @Override
    public void registerSubscriber(final @NotNull Subscriber<IndexedValue<V>> subscriber, final @NotNull IndexQuery<V> vanillaIndexQuery) {
        final AtomicBoolean hasAlreadySubscribed = new AtomicBoolean();
        if (this.hub.outBytesLock().isHeldByCurrentThread()) {
            throw new IllegalStateException("Cannot view map while debugging");
        }
        AbstractAsyncSubscription asyncSubscription = new AbstractAsyncSubscription(this.hub, this.csp, "RemoteIndexQueueView registerTopicSubscriber"){
            volatile long fromIndex;
            {
                super(arg0, arg1, arg2);
                this.fromIndex = 0L;
            }

            public void onSubscribe(@NotNull WireOut wireOut) {
                IndexQuery q;
                if (hasAlreadySubscribed.getAndSet(true)) {
                    VanillaIndexQuery query = (VanillaIndexQuery)vanillaIndexQuery.deepCopy();
                    query.fromIndex(this.fromIndex + 1L);
                    query.bootstrap(false);
                    q = query;
                } else {
                    q = vanillaIndexQuery;
                }
                RemoteIndexQueueView.this.subscribersToTid.put(subscriber, this.tid());
                wireOut.writeEventName((WireKey)IndexQueueViewHandler.EventId.registerSubscriber).typedMarshallable((WriteMarshallable)q);
            }

            public void onConsumer(@NotNull WireIn inWire) {
                try (DocumentContext dc = inWire.readingDocument();){
                    if (!dc.isPresent()) {
                        return;
                    }
                    StringBuilder sb = Wires.acquireStringBuilder();
                    ValueIn valueIn = dc.wire().readEventName(sb);
                    if (CoreFields.reply.contentEquals((CharSequence)sb)) {
                        try {
                            IndexedValue e = (IndexedValue)valueIn.typedMarshallable();
                            this.fromIndex = Math.max(this.fromIndex, e.index());
                            subscriber.onMessage(e);
                        }
                        catch (InvalidSubscriberException e) {
                            RemoteIndexQueueView.this.unregisterSubscriber(subscriber);
                        }
                    } else if (IndexQueueViewHandler.EventId.onEndOfSubscription.contentEquals(sb)) {
                        subscriber.onEndOfSubscription();
                        RemoteIndexQueueView.this.hub.unsubscribe(this.tid());
                    }
                }
                catch (Exception e) {
                    Jvm.warn().on(((Object)((Object)this)).getClass(), (Throwable)e);
                }
            }
        };
        this.hub.subscribe((AsyncSubscription)asyncSubscription);
    }

    @Override
    public void unregisterSubscriber(@NotNull Subscriber<IndexedValue<V>> listener) {
        Long tid = this.subscribersToTid.get(listener);
        if (tid == null) {
            Jvm.warn().on(this.getClass(), "There is no subscription to unsubscribe, was " + this.subscribersToTid.size() + " other subscriptions.");
            return;
        }
        this.hub.preventSubscribeUponReconnect(tid.longValue());
        if (!this.hub.isOpen()) {
            this.hub.unsubscribe(tid.longValue());
            return;
        }
        this.hub.lock(() -> {
            this.writeMetaDataForKnownTID(tid);
            this.hub.outWire().writeDocument(false, wireOut -> wireOut.writeEventName((WireKey)IndexQueueViewHandler.EventId.unregisterSubscriber).text(""));
        });
    }
}

