/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.transport.netty;

import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.buffer.DcpRequestDispatcher;
import com.couchbase.client.dcp.conductor.DcpChannelControlHandler;
import com.couchbase.client.dcp.core.state.NotConnectedException;
import com.couchbase.client.dcp.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.dcp.deps.io.netty.channel.Channel;
import com.couchbase.client.dcp.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.dcp.deps.io.netty.channel.ChannelInboundHandlerAdapter;
import com.couchbase.client.dcp.deps.io.netty.handler.timeout.IdleState;
import com.couchbase.client.dcp.deps.io.netty.handler.timeout.IdleStateEvent;
import com.couchbase.client.dcp.deps.io.netty.util.ReferenceCountUtil;
import com.couchbase.client.dcp.deps.io.netty.util.concurrent.EventExecutor;
import com.couchbase.client.dcp.deps.io.netty.util.concurrent.Future;
import com.couchbase.client.dcp.deps.io.netty.util.concurrent.ImmediateEventExecutor;
import com.couchbase.client.dcp.deps.io.netty.util.concurrent.Promise;
import com.couchbase.client.dcp.events.Tracer;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpNoopRequest;
import com.couchbase.client.dcp.message.DcpNoopResponse;
import com.couchbase.client.dcp.message.DcpSeqnoAdvancedRequest;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.DcpStreamEndMessage;
import com.couchbase.client.dcp.message.DcpSystemEventRequest;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.metrics.DcpChannelMetrics;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.dcp.transport.netty.ChannelFlowControllerImpl;
import com.couchbase.client.dcp.transport.netty.DcpResponse;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Objects;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DcpMessageHandler
extends ChannelInboundHandlerAdapter
implements DcpRequestDispatcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(DcpMessageHandler.class);
    private final DataEventHandler dataEventHandler;
    private final DcpChannelControlHandler controlHandler;
    private final ChannelFlowController flowController;
    private final DcpChannelMetrics metrics;
    private final Tracer tracer;
    private int nextOpaque = Integer.MIN_VALUE;
    private volatile ChannelHandlerContext volatileContext;
    private final Queue<OutstandingRequest> outstandingRequests = new ArrayDeque<OutstandingRequest>();

    DcpMessageHandler(Channel channel, Client.Environment environment, DcpChannelControlHandler controlHandler, DcpChannelMetrics metrics) {
        this.dataEventHandler = environment.dataEventHandler();
        this.controlHandler = controlHandler;
        this.flowController = new ChannelFlowControllerImpl(channel, environment);
        this.metrics = Objects.requireNonNull(metrics);
        this.tracer = environment.tracer();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.tracer.onConnectionOpen(String.valueOf(ctx.channel()));
        this.volatileContext = ctx;
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.tracer.onConnectionClose(String.valueOf(ctx.channel()));
        this.volatileContext = null;
        NotConnectedException connectionClosed = new NotConnectedException("Channel became inactive while awaiting response.");
        for (OutstandingRequest request : this.outstandingRequests) {
            try {
                request.promise.setFailure(connectionClosed);
            }
            catch (Throwable t) {
                LOGGER.error("Failed to set promise failure", t);
            }
        }
        this.outstandingRequests.clear();
        super.channelInactive(ctx);
    }

    @Override
    public Future<DcpResponse> sendRequest(final ByteBuf request) {
        ChannelHandlerContext ctx = this.volatileContext;
        if (ctx == null) {
            ReferenceCountUtil.safeRelease(request);
            return ImmediateEventExecutor.INSTANCE.newFailedFuture(new NotConnectedException("Failed to issue request; channel is not active."));
        }
        EventExecutor executor = ctx.executor();
        final Promise<DcpResponse> promise = executor.newPromise();
        if (executor.inEventLoop()) {
            this.unsafeSendRequest(ctx, request, promise);
            return promise;
        }
        try {
            executor.submit(new Runnable(){

                @Override
                public void run() {
                    ChannelHandlerContext ctx = DcpMessageHandler.this.volatileContext;
                    if (ctx == null) {
                        ReferenceCountUtil.safeRelease(request);
                        promise.setFailure(new NotConnectedException("Failed to issue request; channel is not active."));
                    } else {
                        DcpMessageHandler.this.unsafeSendRequest(ctx, request, promise);
                    }
                }
            });
        }
        catch (Throwable t) {
            ReferenceCountUtil.safeRelease(request);
            promise.setFailure(t);
        }
        return promise;
    }

    private void unsafeSendRequest(ChannelHandlerContext ctx, ByteBuf request, Promise<DcpResponse> promise) {
        if (!ctx.executor().inEventLoop()) {
            throw new IllegalStateException("Must not be called outside event loop");
        }
        try {
            this.metrics.trackDcpRequest(promise, request);
            int opaque = this.nextOpaque++;
            MessageUtil.setOpaque(opaque, request);
            ctx.writeAndFlush(request, ctx.voidPromise());
            this.outstandingRequests.add(new OutstandingRequest(opaque, promise));
        }
        catch (Throwable t) {
            promise.setFailure(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf message = (ByteBuf)msg;
        this.metrics.incrementBytesRead(message.readableBytes());
        byte magic = message.getByte(0);
        if (magic != -127) {
            this.handleRequest(ctx, message);
            return;
        }
        OutstandingRequest request = this.outstandingRequests.poll();
        if (request == null || MessageUtil.getOpaque(message) != request.opaque) {
            try {
                if (request != null) {
                    request.promise.setFailure(new IOException("Response arrived out of order"));
                }
                LOGGER.error("Unexpected response with opaque {} (expected {}); closing connection", (Object)MessageUtil.getOpaque(message), request == null ? "none" : Integer.valueOf(request.opaque));
                ctx.close();
                return;
            }
            finally {
                message.release();
            }
        }
        request.promise.setSuccess(new DcpResponse(message));
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent e;
        if (evt instanceof IdleStateEvent && (e = (IdleStateEvent)evt).state() == IdleState.READER_IDLE) {
            this.metrics.incrementDeadConnections();
            LOGGER.warn("Closing dead connection.");
            ctx.close();
            return;
        }
        super.userEventTriggered(ctx, evt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleRequest(ChannelHandlerContext ctx, ByteBuf message) {
        this.metrics.recordServerRequest(message);
        if (DcpMessageHandler.isDataMessage(message)) {
            this.tracer.onDataEvent(message, ctx.channel());
            this.dataEventHandler.onEvent(this.flowController, message);
        } else if (DcpMessageHandler.isControlMessage(message)) {
            this.tracer.onControlEvent(message, ctx.channel());
            this.controlHandler.onEvent(this.flowController, message);
        } else {
            if (DcpNoopRequest.is(message)) {
                try {
                    ByteBuf buffer = ctx.alloc().buffer();
                    DcpNoopResponse.init(buffer);
                    MessageUtil.setOpaque(MessageUtil.getOpaque(message), buffer);
                    ctx.writeAndFlush(buffer);
                }
                finally {
                    message.release();
                }
            }
            try {
                LOGGER.warn("Unknown DCP Message, ignoring. \n{}", (Object)MessageUtil.humanize(message));
            }
            finally {
                message.release();
            }
        }
    }

    private static boolean isControlMessage(ByteBuf msg) {
        return DcpStreamEndMessage.is(msg) || DcpSnapshotMarkerRequest.is(msg) || DcpSeqnoAdvancedRequest.is(msg) || DcpSystemEventRequest.is(msg);
    }

    private static boolean isDataMessage(ByteBuf msg) {
        return DcpMutationMessage.is(msg) || DcpDeletionMessage.is(msg) || DcpExpirationMessage.is(msg);
    }

    private static class OutstandingRequest {
        private final int opaque;
        private final Promise<DcpResponse> promise;

        private OutstandingRequest(int opaque, Promise<DcpResponse> promise) {
            this.opaque = opaque;
            this.promise = Objects.requireNonNull(promise);
        }
    }
}

