/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.transport.netty;

import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.conductor.DcpChannelControlHandler;
import com.couchbase.client.dcp.config.ClientEnvironment;
import com.couchbase.client.dcp.message.DcpCloseStreamResponse;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpGetPartitionSeqnosResponse;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpNoopRequest;
import com.couchbase.client.dcp.message.DcpNoopResponse;
import com.couchbase.client.dcp.message.DcpOpenStreamResponse;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.DcpStreamEndMessage;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.channel.Channel;
import com.couchbase.client.deps.io.netty.channel.ChannelDuplexHandler;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.handler.timeout.IdleState;
import com.couchbase.client.deps.io.netty.handler.timeout.IdleStateEvent;

public class DcpMessageHandler
extends ChannelDuplexHandler {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(DcpMessageHandler.class);
    private final DataEventHandler dataEventHandler;
    private final DcpChannelControlHandler controlHandler;
    private final ChannelFlowController flowController;

    DcpMessageHandler(Channel channel, ClientEnvironment environment, DcpChannelControlHandler controlHandler) {
        this.dataEventHandler = environment.dataEventHandler();
        this.controlHandler = controlHandler;
        this.flowController = new ChannelFlowController(channel, environment);
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent e;
        if (evt instanceof IdleStateEvent && (e = (IdleStateEvent)evt).state() == IdleState.READER_IDLE) {
            LOGGER.warn("Closing dead connection.");
            ctx.close();
            return;
        }
        super.userEventTriggered(ctx, evt);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf message = (ByteBuf)msg;
        if (DcpMessageHandler.isDataMessage(message)) {
            this.dataEventHandler.onEvent(this.flowController, message);
        } else if (DcpMessageHandler.isControlMessage(message)) {
            this.controlHandler.onEvent(this.flowController, message);
        } else if (DcpNoopRequest.is(message)) {
            ByteBuf buffer = ctx.alloc().buffer();
            DcpNoopResponse.init(buffer);
            MessageUtil.setOpaque(MessageUtil.getOpaque(message), buffer);
            ctx.writeAndFlush((Object)buffer);
        } else {
            LOGGER.warn("Unknown DCP Message, ignoring. \n{}", (Object)MessageUtil.humanize(message));
        }
    }

    private static boolean isControlMessage(ByteBuf msg) {
        return DcpOpenStreamResponse.is(msg) || DcpStreamEndMessage.is(msg) || DcpSnapshotMarkerRequest.is(msg) || DcpFailoverLogResponse.is(msg) || DcpCloseStreamResponse.is(msg) || DcpGetPartitionSeqnosResponse.is(msg);
    }

    private static boolean isDataMessage(ByteBuf msg) {
        return DcpMutationMessage.is(msg) || DcpDeletionMessage.is(msg) || DcpExpirationMessage.is(msg);
    }
}

