/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.highlevel.internal;

import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.DataEventHandler;
import com.couchbase.client.dcp.SystemEventHandler;
import com.couchbase.client.dcp.core.event.CouchbaseEvent;
import com.couchbase.client.dcp.core.logging.RedactableArgument;
import com.couchbase.client.dcp.events.DcpFailureEvent;
import com.couchbase.client.dcp.events.StreamEndEvent;
import com.couchbase.client.dcp.highlevel.Deletion;
import com.couchbase.client.dcp.highlevel.FailoverLog;
import com.couchbase.client.dcp.highlevel.Mutation;
import com.couchbase.client.dcp.highlevel.Rollback;
import com.couchbase.client.dcp.highlevel.SeqnoAdvanced;
import com.couchbase.client.dcp.highlevel.SnapshotDetails;
import com.couchbase.client.dcp.highlevel.SnapshotMarker;
import com.couchbase.client.dcp.highlevel.StreamEnd;
import com.couchbase.client.dcp.highlevel.StreamFailure;
import com.couchbase.client.dcp.highlevel.StreamOffset;
import com.couchbase.client.dcp.highlevel.internal.CollectionIdAndKey;
import com.couchbase.client.dcp.highlevel.internal.CollectionsManifest;
import com.couchbase.client.dcp.highlevel.internal.DatabaseChangeEvent;
import com.couchbase.client.dcp.highlevel.internal.EventDispatcher;
import com.couchbase.client.dcp.highlevel.internal.FlowControlReceipt;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpMutationMessage;
import com.couchbase.client.dcp.message.DcpSeqnoAdvancedRequest;
import com.couchbase.client.dcp.message.DcpSnapshotMarkerRequest;
import com.couchbase.client.dcp.message.DcpSystemEvent;
import com.couchbase.client.dcp.message.DcpSystemEventRequest;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.state.FailoverLogEntry;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventHandlerAdapter
implements ControlEventHandler,
SystemEventHandler {
    private static final Logger log = LoggerFactory.getLogger(EventHandlerAdapter.class);
    private static final int MAX_PARTITIONS = 1024;
    private final Client dcpClient;
    private final EventDispatcher dispatcher;
    private final AtomicLongArray vbucketToUuid = new AtomicLongArray(1024);
    private final AtomicReferenceArray<SnapshotMarker> vbucketToCurrentSnapshot = new AtomicReferenceArray(1024);
    private final DataEventHandler dataEventHandler = (flowController, event) -> {
        int vbucket = -1;
        try {
            FlowControlReceipt receipt = FlowControlReceipt.forMessage(flowController, event);
            vbucket = MessageUtil.getVbucket(event);
            CollectionsManifest manifest = this.getCurrentManifest(vbucket);
            byte opcode = event.getByte(1);
            switch (opcode) {
                case 87: {
                    CollectionIdAndKey collectionIdAndKey = this.extractKey(vbucket, event);
                    CollectionsManifest.CollectionInfo collectionInfo = manifest.getCollection(collectionIdAndKey.collectionId());
                    if (collectionInfo == null) {
                        log.warn("Unrecognized collection ID {} for key {}; assuming collection was deleted, and skipping", (Object)collectionIdAndKey.collectionId(), (Object)RedactableArgument.user(collectionIdAndKey.key()));
                        this.dispatch(new SeqnoAdvanced(vbucket, this.newOffset(event, vbucket)));
                        return;
                    }
                    this.dispatch(new Mutation(event, collectionInfo, collectionIdAndKey.key(), receipt, this.newOffset(event, vbucket)));
                    return;
                }
                case 88: {
                    CollectionIdAndKey collectionIdAndKey = this.extractKey(vbucket, event);
                    CollectionsManifest.CollectionInfo collectionInfo = manifest.getCollection(collectionIdAndKey.collectionId());
                    this.dispatch(new Deletion(event, collectionInfo, collectionIdAndKey.key(), receipt, this.newOffset(event, vbucket), false));
                    return;
                }
                case 89: {
                    CollectionIdAndKey collectionIdAndKey = this.extractKey(vbucket, event);
                    CollectionsManifest.CollectionInfo collectionInfo = manifest.getCollection(collectionIdAndKey.collectionId());
                    this.dispatch(new Deletion(event, collectionInfo, collectionIdAndKey.key(), receipt, this.newOffset(event, vbucket), true));
                    return;
                }
            }
            receipt.acknowledge();
            log.warn("Unexpected data event type: {}", (Object)MessageUtil.getShortOpcodeName(event));
        }
        catch (Throwable t) {
            log.error("Failed to dispatch data event", t);
            this.dispatchOrLogError(new StreamFailure(vbucket, t));
        }
        finally {
            event.release();
        }
    };

    private EventHandlerAdapter(Client dcpClient, EventDispatcher dispatcher) {
        this.dcpClient = Objects.requireNonNull(dcpClient);
        this.dispatcher = Objects.requireNonNull(dispatcher);
        dcpClient.controlEventHandler(this);
        dcpClient.systemEventHandler(this);
        dcpClient.dataEventHandler(this.dataEventHandler);
    }

    public static EventHandlerAdapter register(Client dcpClient, EventDispatcher dispatcher) {
        return new EventHandlerAdapter(dcpClient, dispatcher);
    }

    private void dispatch(DatabaseChangeEvent event) {
        this.dispatcher.dispatch(event);
    }

    private void dispatchOrLogError(StreamFailure event) {
        try {
            this.dispatch(event);
        }
        catch (Throwable t) {
            log.error("Error occurred during stream failure event dispatch.", t);
        }
    }

    @Override
    public void onEvent(CouchbaseEvent event) {
        try {
            if (event instanceof StreamEndEvent) {
                StreamEndEvent streamEnd = (StreamEndEvent)event;
                this.dispatch(new StreamEnd(streamEnd.partition(), streamEnd.reason()));
            } else if (event instanceof DcpFailureEvent) {
                DcpFailureEvent fail = (DcpFailureEvent)((Object)event);
                this.dispatch(new StreamFailure(fail.partition().orElse(-1), fail.error()));
            } else {
                log.debug("Ignoring unrecognized system event: {}", event.toMap());
            }
        }
        catch (Throwable t) {
            log.error("Failed to dispatch system event", t);
            this.dispatchOrLogError(new StreamFailure(-1, t));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onEvent(ChannelFlowController flowController, ByteBuf event) {
        try {
            flowController.ack(event);
            byte opcode = event.getByte(1);
            switch (opcode) {
                case 86: {
                    SnapshotMarker marker = new SnapshotMarker(DcpSnapshotMarkerRequest.startSeqno(event), DcpSnapshotMarkerRequest.endSeqno(event));
                    int flags = DcpSnapshotMarkerRequest.flags(event);
                    int vbucket = MessageUtil.getVbucket(event);
                    this.vbucketToCurrentSnapshot.set(vbucket, marker);
                    this.dispatch(new SnapshotDetails(vbucket, flags, marker));
                    return;
                }
                case 0: {
                    int vbucket = RollbackMessage.vbucket(event);
                    Consumer<Throwable> defaultErrorHandler = t -> this.dispatch(new StreamFailure(vbucket, (Throwable)t));
                    this.dispatch(new Rollback(this.dcpClient, vbucket, RollbackMessage.seqno(event), defaultErrorHandler));
                    return;
                }
                case 84: {
                    int vbucket = DcpFailoverLogResponse.vbucket(event);
                    List<FailoverLogEntry> failoverLog = DcpFailoverLogResponse.entries(event);
                    this.vbucketToUuid.set(vbucket, failoverLog.get(0).getUuid());
                    this.dispatch(new FailoverLog(vbucket, failoverLog));
                    return;
                }
                case 100: {
                    int vbucket = MessageUtil.getVbucket(event);
                    long seqno = DcpSeqnoAdvancedRequest.getSeqno(event);
                    this.vbucketToCurrentSnapshot.set(vbucket, new SnapshotMarker(seqno, seqno));
                    log.debug("vbucket {} seqno advanced to {}", (Object)vbucket, (Object)seqno);
                    this.dispatch(new SeqnoAdvanced(vbucket, this.newOffset(vbucket, seqno)));
                    return;
                }
                case 95: {
                    DcpSystemEvent sysEvent = DcpSystemEvent.parse(event);
                    if (sysEvent instanceof DatabaseChangeEvent) {
                        log.debug("Received system event: {}", (Object)sysEvent);
                        this.dispatch((DatabaseChangeEvent)((Object)sysEvent));
                    } else {
                        log.warn("Received unrecognized system event:\n{}", (Object)MessageUtil.humanize(event));
                    }
                    int vbucket = MessageUtil.getVbucket(event);
                    long seqno = DcpSystemEventRequest.getSeqno(event);
                    log.debug("vbucket {} seqno advanced to {} due to system event", (Object)vbucket, (Object)seqno);
                    this.dispatch(new SeqnoAdvanced(vbucket, this.newOffset(vbucket, seqno)));
                    return;
                }
            }
            log.warn("Unexpected control event type: {}", (Object)MessageUtil.getShortOpcodeName(MessageUtil.getOpcode(event)));
        }
        catch (Throwable t2) {
            log.error("Failed to dispatch control event", t2);
            this.dispatchOrLogError(new StreamFailure(-1, t2));
        }
        finally {
            event.release();
        }
    }

    private CollectionsManifest getCurrentManifest(int vbucket) {
        return this.dcpClient.sessionState().get(vbucket).getCollectionsManifest();
    }

    private CollectionIdAndKey extractKey(int vbucket, ByteBuf event) {
        return this.dcpClient.sessionState().get(vbucket).getKeyExtractor().getCollectionIdAndKey(event);
    }

    private StreamOffset newOffset(ByteBuf dataEvent, int vbucket) {
        long seqno = DcpMutationMessage.bySeqno(dataEvent);
        return this.newOffset(vbucket, seqno);
    }

    private StreamOffset newOffset(int vbucket, long seqno) {
        long vbuuid = this.vbucketToUuid.get(vbucket);
        SnapshotMarker snapshot = this.vbucketToCurrentSnapshot.get(vbucket);
        long collectionsManifestUid = this.dcpClient.sessionState().get(vbucket).getCollectionsManifestUid();
        try {
            return new StreamOffset(vbuuid, seqno, snapshot, collectionsManifestUid);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid stream offset detected for partition {} with UUID {}: {}\nClient's view of session state: {}", new Object[]{vbucket, vbuuid, e.getMessage(), this.dcpClient.sessionState().get(vbucket)});
            throw e;
        }
    }
}

