/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.testserver.endpoints;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.VoidResult;
import io.fluxcapacitor.common.api.tracking.ClaimSegment;
import io.fluxcapacitor.common.api.tracking.ClaimSegmentResult;
import io.fluxcapacitor.common.api.tracking.DisconnectTracker;
import io.fluxcapacitor.common.api.tracking.GetPosition;
import io.fluxcapacitor.common.api.tracking.GetPositionResult;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.api.tracking.Read;
import io.fluxcapacitor.common.api.tracking.ReadFromIndex;
import io.fluxcapacitor.common.api.tracking.ReadFromIndexResult;
import io.fluxcapacitor.common.api.tracking.ReadResult;
import io.fluxcapacitor.common.api.tracking.ResetPosition;
import io.fluxcapacitor.common.api.tracking.StorePosition;
import io.fluxcapacitor.javaclient.tracking.client.InMemoryMessageStore;
import io.fluxcapacitor.testserver.Handle;
import io.fluxcapacitor.testserver.WebsocketEndpoint;
import io.fluxcapacitor.testserver.endpoints.WebSocketTrackerRead;
import java.beans.ConstructorProperties;
import java.util.List;
import java.util.Objects;
import javax.websocket.CloseReason;
import javax.websocket.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerEndpoint
extends WebsocketEndpoint {
    private static final Logger log = LoggerFactory.getLogger(ConsumerEndpoint.class);
    private final InMemoryMessageStore store;
    private final MessageType messageType;

    @Handle
    public void handle(Read read, Session session) {
        this.store.read(new WebSocketTrackerRead(read, this.getClientId(session), session.getId(), this.messageType)).whenComplete((b, e) -> {
            if (e != null) {
                log.error("Failed to complete read", (Throwable)e);
            } else {
                this.sendResult(session, new ReadResult(read.getRequestId(), (MessageBatch)b));
            }
        });
    }

    @Handle
    public void handle(ClaimSegment read, Session session) {
        this.store.claimSegment(new WebSocketTrackerRead(read, this.getClientId(session), session.getId(), this.messageType)).whenComplete((b, e) -> {
            if (e != null) {
                log.error("Failed to complete claim segment", (Throwable)e);
            } else {
                this.sendResult(session, new ClaimSegmentResult(read.getRequestId(), this.store.getPosition(read.getConsumer()), (int[])b));
            }
        });
    }

    @Handle
    public VoidResult handle(StorePosition storePosition) {
        this.store.storePosition(storePosition.getConsumer(), storePosition.getSegment(), storePosition.getLastIndex());
        return new VoidResult(storePosition.getRequestId());
    }

    @Handle
    public VoidResult handle(ResetPosition resetPosition) {
        this.store.resetPosition(resetPosition.getConsumer(), resetPosition.getLastIndex());
        return new VoidResult(resetPosition.getRequestId());
    }

    @Handle
    public void handle(DisconnectTracker disconnectTracker) {
        this.store.disconnectTracker(disconnectTracker.getConsumer(), disconnectTracker.getTrackerId(), disconnectTracker.isSendFinalEmptyBatch());
    }

    @Handle
    public ReadFromIndexResult handle(ReadFromIndex read) {
        List<SerializedMessage> batch = this.store.readFromIndex(read.getMinIndex() - 1L, read.getMaxSize());
        return new ReadFromIndexResult(read.getRequestId(), batch);
    }

    @Handle
    public GetPositionResult handle(GetPosition getPosition) {
        return new GetPositionResult(getPosition.getRequestId(), this.store.getPosition(getPosition.getConsumer()));
    }

    @Override
    public void onClose(Session session, CloseReason closeReason) {
        super.onClose(session, closeReason);
        this.store.disconnectTrackersMatching(t -> Objects.equals(t.getSessionId(), session.getId()));
    }

    @ConstructorProperties(value={"store", "messageType"})
    public ConsumerEndpoint(InMemoryMessageStore store, MessageType messageType) {
        this.store = store;
        this.messageType = messageType;
    }
}

