/*
 * Decompiled with CFR 0.152.
 */
package org.drasyl.handler.pubsub;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.drasyl.channel.OverlayAddressedMessage;
import org.drasyl.handler.pubsub.PubSubPublish;
import org.drasyl.handler.pubsub.PubSubSubscribe;
import org.drasyl.handler.pubsub.PubSubSubscribed;
import org.drasyl.handler.pubsub.PubSubUnsubscribe;
import org.drasyl.handler.pubsub.PubSubUnsubscribed;
import org.drasyl.identity.DrasylAddress;
import org.drasyl.util.Pair;
import org.drasyl.util.Preconditions;
import org.drasyl.util.logging.Logger;
import org.drasyl.util.logging.LoggerFactory;

public class PubSubSubscribeHandler
extends ChannelDuplexHandler {
    public static final Duration DEFAULT_SUBSCRIBE_TIMEOUT = Duration.ofMillis(5000L);
    private static final Logger LOG = LoggerFactory.getLogger(PubSubSubscribeHandler.class);
    private final Duration subscribeTimeout;
    private final Map<UUID, Pair<Promise<Void>, String>> requests;
    private final DrasylAddress broker;
    private final Set<String> subscriptions;

    PubSubSubscribeHandler(Duration subscribeTimeout, Map<UUID, Pair<Promise<Void>, String>> requests, DrasylAddress broker, Set<String> subscriptions) {
        this.subscribeTimeout = Preconditions.requireNonNegative((Duration)subscribeTimeout);
        this.requests = Objects.requireNonNull(requests);
        this.broker = Objects.requireNonNull(broker);
        this.subscriptions = Objects.requireNonNull(subscriptions);
    }

    public PubSubSubscribeHandler(Duration subscribeTimeout, DrasylAddress broker) {
        this(subscribeTimeout, new HashMap<UUID, Pair<Promise<Void>, String>>(), broker, new HashSet<String>());
    }

    public PubSubSubscribeHandler(DrasylAddress broker) {
        this(DEFAULT_SUBSCRIBE_TIMEOUT, broker);
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        this.unsubscribeFromAll(ctx);
        ctx.fireChannelInactive();
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        if (msg instanceof PubSubSubscribe) {
            this.doSubscribe(ctx, (PubSubSubscribe)msg, promise);
        } else if (msg instanceof PubSubUnsubscribe) {
            this.doUnsubscribe(ctx, (PubSubUnsubscribe)msg, promise);
        } else {
            ctx.write(msg, promise);
        }
    }

    private void doSubscribe(ChannelHandlerContext ctx, PubSubSubscribe msg, ChannelPromise promise) {
        LOG.trace("Send `{}` to broker `{}`.", (Object)msg, (Object)this.broker);
        ctx.write((Object)new OverlayAddressedMessage((Object)msg, this.broker)).addListener((GenericFutureListener)((FutureListener)future -> {
            if (!this.subscribeTimeout.isZero() && future.isSuccess()) {
                this.requests.put(msg.getId(), (Pair<Promise<Void>, String>)Pair.of((Object)promise, (Object)msg.getTopic()));
                promise.addListener((GenericFutureListener)((FutureListener)future1 -> this.requests.remove(msg.getId())));
                ctx.executor().schedule(() -> promise.tryFailure((Throwable)new Exception("Got no confirmation from broker within " + this.subscribeTimeout.toMillis() + "ms.")), this.subscribeTimeout.toMillis(), TimeUnit.MILLISECONDS);
            } else {
                PromiseNotifier.cascade((Future)future, (Promise)promise);
            }
        }));
    }

    private void doUnsubscribe(ChannelHandlerContext ctx, PubSubUnsubscribe msg, ChannelPromise promise) {
        LOG.trace("Send `{}` to broker `{}`.", (Object)msg, (Object)this.broker);
        ctx.write((Object)new OverlayAddressedMessage((Object)msg, this.broker)).addListener((GenericFutureListener)((FutureListener)future -> {
            if (!this.subscribeTimeout.isZero() && future.isSuccess()) {
                this.requests.put(msg.getId(), (Pair<Promise<Void>, String>)Pair.of((Object)promise, (Object)msg.getTopic()));
                promise.addListener((GenericFutureListener)((FutureListener)future1 -> this.requests.remove(msg.getId())));
                ctx.executor().schedule(() -> promise.tryFailure((Throwable)new Exception("Got no confirmation from broker within " + this.subscribeTimeout.toMillis() + "ms.")), this.subscribeTimeout.toMillis(), TimeUnit.MILLISECONDS);
            } else {
                PromiseNotifier.cascade((Future)future, (Promise)promise);
            }
        }));
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof OverlayAddressedMessage && ((OverlayAddressedMessage)msg).content() instanceof PubSubSubscribed && this.broker.equals(((OverlayAddressedMessage)msg).sender())) {
            this.handleSubscribed((PubSubSubscribed)((OverlayAddressedMessage)msg).content());
        } else if (msg instanceof OverlayAddressedMessage && ((OverlayAddressedMessage)msg).content() instanceof PubSubUnsubscribed && this.broker.equals(((OverlayAddressedMessage)msg).sender())) {
            this.handleUnsubscribed((PubSubUnsubscribed)((OverlayAddressedMessage)msg).content());
        } else if (msg instanceof OverlayAddressedMessage && ((OverlayAddressedMessage)msg).content() instanceof PubSubPublish && this.broker.equals(((OverlayAddressedMessage)msg).sender())) {
            this.handlePublish(ctx, (PubSubPublish)((OverlayAddressedMessage)msg).content());
        } else if (msg instanceof OverlayAddressedMessage && ((OverlayAddressedMessage)msg).content() instanceof PubSubUnsubscribe && this.broker.equals(((OverlayAddressedMessage)msg).sender())) {
            this.handleUnsubscribe((PubSubUnsubscribe)((OverlayAddressedMessage)msg).content());
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private void handleSubscribed(PubSubSubscribed msg) {
        LOG.trace("Got `{}` from broker `{}`.", (Object)msg, (Object)this.broker);
        Pair<Promise<Void>, String> pair = this.requests.remove(msg.getId());
        if (pair != null) {
            Promise promise = (Promise)pair.first();
            String topic = (String)pair.second();
            if (this.subscriptions.add(topic)) {
                LOG.debug("Subscribed to topic `{}` at broker `{}`.", (Object)topic, (Object)this.broker);
            }
            promise.trySuccess(null);
        }
    }

    private void handleUnsubscribed(PubSubUnsubscribed msg) {
        LOG.trace("Got `{}` from broker `{}`.", (Object)msg, (Object)this.broker);
        Pair<Promise<Void>, String> pair = this.requests.remove(msg.getId());
        if (pair != null) {
            Promise promise = (Promise)pair.first();
            String topic = (String)pair.second();
            if (this.subscriptions.remove(topic)) {
                LOG.debug("Unsubscribed from topic `{}` from broker `{}`.", (Object)topic, (Object)this.broker);
            }
            promise.trySuccess(null);
        }
    }

    private void handlePublish(ChannelHandlerContext ctx, PubSubPublish msg) {
        LOG.trace("Got `{}` from broker `{}`.", (Object)msg, (Object)this.broker);
        String topic = msg.getTopic();
        if (this.subscriptions.contains(topic)) {
            LOG.trace("Got publication for topic `{}` from broker `{}`: {}", new Object[]{topic, this.broker, msg.getContent()});
            ctx.fireChannelRead((Object)msg);
        } else {
            LOG.trace("Got publication for topic `{}` from broker `{}` we're not subscribed to. Discard publication: {}", new Object[]{topic, this.broker, msg.getContent()});
            msg.release();
        }
    }

    private void handleUnsubscribe(PubSubUnsubscribe msg) {
        LOG.trace("Got `{}` from broker `{}`.", (Object)msg, (Object)this.broker);
        String topic = msg.getTopic();
        if (this.subscriptions.remove(topic)) {
            LOG.debug("Unsubscribed from topic `{}` as broker `{}` is shutting down.", (Object)topic, (Object)this.broker);
        }
    }

    private void unsubscribeFromAll(ChannelHandlerContext ctx) {
        for (String topic : this.subscriptions) {
            LOG.trace("Channel is closing. Unsubscribe from topic `{}` at broker `{}`.", (Object)topic, (Object)this.broker);
            ctx.write((Object)new OverlayAddressedMessage((Object)PubSubUnsubscribe.of(topic), this.broker));
        }
        if (!this.subscriptions.isEmpty()) {
            ctx.flush();
            this.subscriptions.clear();
        }
    }
}

