/*
 * Decompiled with CFR 0.152.
 */
package org.drasyl.handler.arq.stopandwait;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelPromise;
import io.netty.channel.PendingWriteQueue;
import io.netty.util.concurrent.GenericFutureListener;
import java.nio.channels.ClosedChannelException;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.drasyl.handler.arq.stopandwait.StopAndWaitArqAck;
import org.drasyl.handler.arq.stopandwait.StopAndWaitArqData;
import org.drasyl.util.Preconditions;
import org.drasyl.util.logging.Logger;
import org.drasyl.util.logging.LoggerFactory;

public class StopAndWaitArqHandler
extends ChannelDuplexHandler {
    private static final Logger LOG = LoggerFactory.getLogger(StopAndWaitArqHandler.class);
    private PendingWriteQueue pendingWrites;
    private final Map<Object, ChannelPromise> promises = new IdentityHashMap<Object, ChannelPromise>();
    private int retryTimeout;
    private boolean expectedInboundSequenceNo;
    private long lastWriteAttempt;
    private Object lastWrite;

    public StopAndWaitArqHandler(int retryTimeout, boolean expectedInboundSequenceNo) {
        this.setRetryTimeout(retryTimeout);
        this.setExpectedInboundSequenceNo(expectedInboundSequenceNo);
    }

    public StopAndWaitArqHandler(int retryTimeout) {
        this(retryTimeout, false);
    }

    public void handlerAdded(ChannelHandlerContext ctx) {
        this.pendingWrites = new PendingWriteQueue(ctx);
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        this.discardPendingWrites(ctx, new ClosedChannelException());
        ctx.fireChannelInactive();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof StopAndWaitArqData) {
            StopAndWaitArqData data = (StopAndWaitArqData)msg;
            if (this.expectedInboundSequenceNo == data.sequenceNo()) {
                LOG.trace("[{}] Got expected {}. Pass through.", () -> ((ChannelId)ctx.channel().id()).asShortText(), () -> data);
                this.expectedInboundSequenceNo = !this.expectedInboundSequenceNo;
                this.writeAck(ctx);
                ctx.fireChannelRead(msg);
            } else {
                LOG.trace("[{}] Got unexpected {}. Drop it.", () -> ((ChannelId)ctx.channel().id()).asShortText(), () -> data);
                data.release();
                this.writeAck(ctx);
            }
        } else if (msg instanceof StopAndWaitArqAck) {
            StopAndWaitArqAck ack = (StopAndWaitArqAck)msg;
            Boolean outboundSequenceNo = this.outboundSequenceNo();
            if (outboundSequenceNo != null && outboundSequenceNo.booleanValue() != ack.sequenceNo()) {
                LOG.trace("[{}] Got expected {}. Succeed current DATA.", () -> ((ChannelId)ctx.channel().id()).asShortText(), () -> ack);
                this.succeedCurrentWrite(ctx);
                this.writeNextPending(ctx);
            } else {
                LOG.trace("[{}] Got unexpected {}. Drop it.", () -> ((ChannelId)ctx.channel().id()).asShortText(), () -> ack);
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        if (msg instanceof StopAndWaitArqData) {
            StopAndWaitArqData data = (StopAndWaitArqData)msg;
            this.promises.put(data, promise);
            this.pendingWrites.add((Object)data, promise);
            promise.addListener(future -> this.promises.remove(data));
        } else {
            ctx.write(msg, promise);
        }
    }

    public void flush(ChannelHandlerContext ctx) throws Exception {
        this.writeNextPending(ctx);
        ctx.flush();
    }

    public void setRetryTimeout(int retryTimeout) {
        this.retryTimeout = Preconditions.requirePositive((int)retryTimeout);
    }

    public void setExpectedInboundSequenceNo(boolean expectedInboundSequenceNo) {
        this.expectedInboundSequenceNo = expectedInboundSequenceNo;
    }

    private Boolean outboundSequenceNo() {
        StopAndWaitArqData currentWrite = (StopAndWaitArqData)this.pendingWrites.current();
        if (currentWrite == null) {
            return null;
        }
        return currentWrite.sequenceNo();
    }

    private void writeNextPending(ChannelHandlerContext ctx) {
        StopAndWaitArqData currentWrite;
        Channel channel = ctx.channel();
        if (!channel.isActive()) {
            this.discardPendingWrites(ctx, new ClosedChannelException());
            return;
        }
        while ((currentWrite = (StopAndWaitArqData)this.pendingWrites.current()) != null) {
            ChannelPromise promise = this.promises.get(currentWrite);
            if (promise == null || promise.isDone()) {
                this.pendingWrites.remove();
                continue;
            }
            long currentTime = System.currentTimeMillis();
            if (currentTime < this.lastWriteAttempt + (long)this.retryTimeout) break;
            this.lastWriteAttempt = currentTime;
            if (LOG.isTraceEnabled()) {
                if (this.lastWrite == currentWrite) {
                    LOG.trace("[{}] Got no ACK for current DATA. Send again.", () -> ((ChannelId)ctx.channel().id()).asShortText());
                }
                this.lastWrite = currentWrite;
            }
            LOG.trace("[{}] Write {}", () -> ((ChannelId)ctx.channel().id()).asShortText(), () -> currentWrite);
            ctx.writeAndFlush((Object)currentWrite.retainedDuplicate()).addListener((GenericFutureListener)((ChannelFutureListener)future -> {
                if (!future.isSuccess()) {
                    promise.tryFailure(future.cause());
                    Supplier[] supplierArray = new Supplier[3];
                    supplierArray[0] = () -> ((ChannelId)ctx.channel().id()).asShortText();
                    supplierArray[1] = () -> currentWrite;
                    supplierArray[2] = () -> ((ChannelFuture)future).cause();
                    LOG.trace("[{}] Unable to write {}:", supplierArray);
                }
                ctx.executor().schedule(() -> this.writeNextPending(ctx), (long)this.retryTimeout, TimeUnit.MILLISECONDS);
            }));
            break;
        }
    }

    private void succeedCurrentWrite(ChannelHandlerContext ctx) {
        StopAndWaitArqData currentWrite = (StopAndWaitArqData)this.pendingWrites.current();
        if (currentWrite != null) {
            LOG.trace("[{}] Succeed {}.", () -> ((ChannelId)ctx.channel().id()).asShortText(), () -> currentWrite);
            this.pendingWrites.remove().trySuccess();
        }
        this.lastWriteAttempt = 0L;
    }

    private void discardPendingWrites(ChannelHandlerContext ctx, Throwable cause) {
        Supplier[] supplierArray = new Supplier[3];
        supplierArray[0] = () -> ((ChannelId)ctx.channel().id()).asShortText();
        supplierArray[1] = () -> ((PendingWriteQueue)this.pendingWrites).size();
        supplierArray[2] = () -> cause;
        LOG.trace("[{}] Discard {} pending writes:", supplierArray);
        this.pendingWrites.removeAndFailAll(cause);
    }

    private void writeAck(ChannelHandlerContext ctx) {
        if (ctx.channel().isOpen()) {
            StopAndWaitArqAck ack = this.expectedInboundSequenceNo ? StopAndWaitArqAck.STOP_AND_WAIT_ACK_1 : StopAndWaitArqAck.STOP_AND_WAIT_ACK_0;
            LOG.trace("[{}] Write {}", () -> ((ChannelId)ctx.channel().id()).asShortText(), () -> ack);
            ctx.writeAndFlush((Object)ack).addListener((GenericFutureListener)((ChannelFutureListener)future -> {
                if (!future.isSuccess()) {
                    Supplier[] supplierArray = new Supplier[3];
                    supplierArray[0] = () -> ((ChannelId)future.channel().id()).asShortText();
                    supplierArray[1] = () -> ack;
                    supplierArray[2] = () -> ((ChannelFuture)future).cause();
                    LOG.trace("[{}] Unable to send {}:", supplierArray);
                    future.channel().close();
                }
            }));
        }
    }
}

