/*
 * Decompiled with CFR 0.152.
 */
package org.drasyl.handler.pubsub;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.drasyl.channel.OverlayAddressedMessage;
import org.drasyl.handler.pubsub.PubSubPublish;
import org.drasyl.handler.pubsub.PubSubPublished;
import org.drasyl.identity.DrasylAddress;
import org.drasyl.util.Preconditions;
import org.drasyl.util.logging.Logger;
import org.drasyl.util.logging.LoggerFactory;

public class PubSubPublishHandler
extends ChannelDuplexHandler {
    public static final Duration DEFAULT_PUBLISH_TIMEOUT = Duration.ofMillis(5000L);
    private static final Logger LOG = LoggerFactory.getLogger(PubSubPublishHandler.class);
    private final Duration publishTimeout;
    private final Map<UUID, Promise<Void>> requests;
    private final DrasylAddress broker;

    PubSubPublishHandler(Duration publishTimeout, Map<UUID, Promise<Void>> requests, DrasylAddress broker) {
        this.publishTimeout = Preconditions.requireNonNegative((Duration)publishTimeout);
        this.requests = Objects.requireNonNull(requests);
        this.broker = Objects.requireNonNull(broker);
    }

    public PubSubPublishHandler(Duration publishTimeout, DrasylAddress broker) {
        this(publishTimeout, new HashMap<UUID, Promise<Void>>(), broker);
    }

    public PubSubPublishHandler(DrasylAddress broker) {
        this(DEFAULT_PUBLISH_TIMEOUT, broker);
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        if (msg instanceof PubSubPublish) {
            this.doPublish(ctx, (PubSubPublish)msg, promise);
        } else {
            ctx.write(msg, promise);
        }
    }

    private void doPublish(ChannelHandlerContext ctx, PubSubPublish msg, ChannelPromise promise) {
        LOG.trace("Send `{}` to broker `{}`.", (Object)msg, (Object)this.broker);
        ctx.write((Object)new OverlayAddressedMessage((Object)msg, this.broker)).addListener((GenericFutureListener)((FutureListener)future -> {
            if (!this.publishTimeout.isZero() && future.isSuccess()) {
                this.requests.put(msg.getId(), (Promise<Void>)promise);
                promise.addListener((GenericFutureListener)((FutureListener)future1 -> this.requests.remove(msg.getId())));
                ctx.executor().schedule(() -> promise.tryFailure((Throwable)new Exception("Got no confirmation from broker within " + this.publishTimeout.toMillis() + "ms.")), this.publishTimeout.toMillis(), TimeUnit.MILLISECONDS);
            } else {
                PromiseNotifier.cascade((Future)future, (Promise)promise);
            }
        }));
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof OverlayAddressedMessage && ((OverlayAddressedMessage)msg).content() instanceof PubSubPublished && this.broker.equals(((OverlayAddressedMessage)msg).sender())) {
            this.handlePublished((PubSubPublished)((OverlayAddressedMessage)msg).content());
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private void handlePublished(PubSubPublished msg) {
        LOG.trace("Got `{}` from broker `{}`.", (Object)msg, (Object)this.broker);
        Promise<Void> promise = this.requests.remove(msg.getId());
        if (promise != null) {
            promise.trySuccess(null);
        }
    }
}

