/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.conductor;

import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.conductor.DcpChannel;
import com.couchbase.client.dcp.core.event.EventBus;
import com.couchbase.client.dcp.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.dcp.events.StreamEndEvent;
import com.couchbase.client.dcp.message.DcpStreamEndMessage;
import com.couchbase.client.dcp.message.StreamEndReason;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DcpChannelControlHandler
implements ControlEventHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(DcpChannelControlHandler.class);
    private final DcpChannel dcpChannel;
    private final ControlEventHandler controlEventHandler;
    private final EventBus eventBus;

    public DcpChannelControlHandler(DcpChannel dcpChannel) {
        this.dcpChannel = dcpChannel;
        this.controlEventHandler = dcpChannel.env.controlEventHandler();
        this.eventBus = dcpChannel.env.eventBus();
    }

    @Override
    public void onEvent(ChannelFlowController flowController, ByteBuf buf) {
        if (DcpStreamEndMessage.is(buf)) {
            this.filterDcpStreamEndMessage(flowController, buf);
        } else {
            this.controlEventHandler.onEvent(flowController, buf);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void filterDcpStreamEndMessage(ChannelFlowController flowController, ByteBuf buf) {
        try {
            int vbid = DcpStreamEndMessage.vbucket(buf);
            StreamEndReason reason = DcpStreamEndMessage.reason(buf);
            LOGGER.debug("Server closed Stream on vbid {} with reason {}", (Object)vbid, (Object)reason);
            StreamEndEvent event = new StreamEndEvent(vbid, reason);
            if (this.dcpChannel.env.persistencePollingEnabled()) {
                this.dcpChannel.env.streamEventBuffer().onStreamEnd(event);
            } else {
                this.eventBus.publish(event);
            }
            this.dcpChannel.streamIsOpen.set(vbid, false);
            if (reason != StreamEndReason.OK) {
                this.dcpChannel.conductor.maybeMovePartition(vbid);
            }
        }
        finally {
            try {
                flowController.ack(buf);
            }
            finally {
                buf.release();
            }
        }
    }
}

