/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.buffer;

import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.core.event.EventBus;
import com.couchbase.client.dcp.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.dcp.events.StreamEndEvent;
import com.couchbase.client.dcp.message.DcpDeletionMessage;
import com.couchbase.client.dcp.message.DcpExpirationMessage;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSeqnoAdvancedRequest;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.DcpSystemEventRequest;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.message.StreamEndReason;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamEventBuffer
implements DataEventHandler,
ControlEventHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamEventBuffer.class);
    private static final int MAX_PARTITIONS = 1024;
    private final EventBus eventBus;
    private volatile DataEventHandler dataEventHandler;
    private volatile ControlEventHandler controlEventHandler;
    private final List<Deque<BufferedEvent>> partitionQueues;

    public StreamEventBuffer(EventBus eventBus) {
        this.eventBus = eventBus;
        ArrayList partitionQueues = new ArrayList(1024);
        for (int i = 0; i < 1024; ++i) {
            partitionQueues.add(new ArrayDeque());
        }
        this.partitionQueues = Collections.unmodifiableList(partitionQueues);
    }

    public void setDataEventHandler(DataEventHandler dataEventHandler) {
        this.dataEventHandler = dataEventHandler;
    }

    public void setControlEventHandler(ControlEventHandler controlEventHandler) {
        this.controlEventHandler = controlEventHandler;
    }

    @Override
    public void onEvent(ChannelFlowController flowController, ByteBuf event) {
        if (DcpMutationMessage.is(event) || DcpDeletionMessage.is(event) || DcpExpirationMessage.is(event)) {
            long seqno = DcpMutationMessage.bySeqno(event);
            int vbucket = MessageUtil.getVbucket(event);
            this.enqueue(vbucket, new BufferedEvent(seqno, event, flowController, BufferedEvent.Type.DATA));
        } else if (DcpSnapshotMarkerRequest.is(event)) {
            long seqno = DcpSnapshotMarkerRequest.startSeqno(event);
            int vbucket = MessageUtil.getVbucket(event);
            this.enqueue(vbucket, new BufferedEvent(seqno, event, flowController, BufferedEvent.Type.CONTROL));
        } else if (RollbackMessage.is(event)) {
            this.rollback(RollbackMessage.vbucket(event), RollbackMessage.seqno(event));
            this.controlEventHandler.onEvent(flowController, event);
        } else if (DcpSystemEventRequest.is(event)) {
            int vbucket = MessageUtil.getVbucket(event);
            long seqno = DcpSystemEventRequest.getSeqno(event);
            this.enqueue(vbucket, new BufferedEvent(seqno, event, flowController, BufferedEvent.Type.CONTROL));
        } else if (DcpSeqnoAdvancedRequest.is(event)) {
            int vbucket = MessageUtil.getVbucket(event);
            long seqno = DcpSeqnoAdvancedRequest.getSeqno(event);
            this.enqueue(vbucket, new BufferedEvent(seqno, event, flowController, BufferedEvent.Type.CONTROL));
        } else {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Propagating unhandled control event: {}", (Object)MessageUtil.humanize(event));
            }
            this.controlEventHandler.onEvent(flowController, event);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onStreamEnd(StreamEndEvent event) {
        Deque<BufferedEvent> queue;
        Deque<BufferedEvent> deque = queue = this.partitionQueues.get(event.partition());
        synchronized (deque) {
            if (event.reason() != StreamEndReason.OK) {
                this.clear(event.partition());
                this.eventBus.publish(event);
                return;
            }
            if (queue.isEmpty()) {
                this.eventBus.publish(event);
                return;
            }
            long fakeSeqno = queue.peekLast().seqno;
            queue.add(BufferedEvent.streamEnd(fakeSeqno));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void enqueue(int vbucket, BufferedEvent event) {
        Queue queue;
        Queue queue2 = queue = (Queue)this.partitionQueues.get(vbucket);
        synchronized (queue2) {
            queue.add(event);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clear(int vbucket) {
        Queue queue;
        Queue queue2 = queue = (Queue)this.partitionQueues.get(vbucket);
        synchronized (queue2) {
            LOGGER.debug("Clearing stream event buffer for partition {}", (Object)vbucket);
            for (BufferedEvent bufferedEvent : queue) {
                bufferedEvent.discard();
            }
            queue.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rollback(int vbucket, long toSeqno) {
        Queue queue;
        Queue queue2 = queue = (Queue)this.partitionQueues.get(vbucket);
        synchronized (queue2) {
            Iterator i = queue.iterator();
            while (i.hasNext()) {
                BufferedEvent event = (BufferedEvent)i.next();
                boolean eventSeqnoIsGreaterThanRollbackSeqno = Long.compareUnsigned(event.seqno, toSeqno) > 0;
                if (!eventSeqnoIsGreaterThanRollbackSeqno) continue;
                LOGGER.trace("Dropping event with seqno {} from stream buffer for partition {}", (Object)event.seqno, (Object)vbucket);
                event.discard();
                i.remove();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean hasBufferedEvents(int vbucket) {
        Queue queue;
        Queue queue2 = queue = (Queue)this.partitionQueues.get(vbucket);
        synchronized (queue2) {
            return !queue.isEmpty();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    void onSeqnoPersisted(int vbucket, long seqno) {
        Queue queue;
        Queue queue2 = queue = (Queue)this.partitionQueues.get(vbucket);
        // MONITORENTER : queue2
        block9: while (!queue.isEmpty() && Long.compareUnsigned(((BufferedEvent)queue.peek()).seqno, seqno) < 1) {
            BufferedEvent event = (BufferedEvent)queue.poll();
            try {
                switch (event.type) {
                    case DATA: {
                        this.dataEventHandler.onEvent(event.flowController, event.event);
                        continue block9;
                    }
                    case CONTROL: {
                        this.controlEventHandler.onEvent(event.flowController, event.event);
                        continue block9;
                    }
                    case STREAM_END_OK: {
                        this.eventBus.publish(new StreamEndEvent(vbucket, StreamEndReason.OK));
                        continue block9;
                    }
                }
                throw new RuntimeException("Unexpected event type: " + (Object)((Object)event.type));
            }
            catch (Throwable t) {
                LOGGER.error("Event handler threw exception", t);
            }
        }
        // MONITOREXIT : queue2
    }

    static class BufferedEvent {
        private final long seqno;
        private final ByteBuf event;
        private final ChannelFlowController flowController;
        private final Type type;

        BufferedEvent(long seqno, ByteBuf event, ChannelFlowController flowController, Type type) {
            this.seqno = seqno;
            this.event = event;
            this.flowController = flowController;
            this.type = type;
        }

        static BufferedEvent streamEnd(long seqno) {
            return new BufferedEvent(seqno, null, null, Type.STREAM_END_OK);
        }

        void discard() {
            try {
                this.flowController.ack(this.event);
            }
            catch (Throwable t) {
                LOGGER.debug("Failed to ack buffered event; channel already closed?", t);
            }
            finally {
                this.event.release();
            }
        }

        static enum Type {
            DATA,
            CONTROL,
            STREAM_END_OK;

        }
    }
}

