/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.testserver.websocket;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.tracking.ClaimSegment;
import io.fluxcapacitor.common.api.tracking.ClaimSegmentResult;
import io.fluxcapacitor.common.api.tracking.DisconnectTracker;
import io.fluxcapacitor.common.api.tracking.GetPosition;
import io.fluxcapacitor.common.api.tracking.GetPositionResult;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.api.tracking.Position;
import io.fluxcapacitor.common.api.tracking.Read;
import io.fluxcapacitor.common.api.tracking.ReadFromIndex;
import io.fluxcapacitor.common.api.tracking.ReadFromIndexResult;
import io.fluxcapacitor.common.api.tracking.ReadResult;
import io.fluxcapacitor.common.api.tracking.ResetPosition;
import io.fluxcapacitor.common.api.tracking.StorePosition;
import io.fluxcapacitor.common.tracking.DefaultTrackingStrategy;
import io.fluxcapacitor.common.tracking.InMemoryPositionStore;
import io.fluxcapacitor.common.tracking.MessageStore;
import io.fluxcapacitor.common.tracking.PositionStore;
import io.fluxcapacitor.common.tracking.TrackingStrategy;
import io.fluxcapacitor.common.tracking.WebSocketTracker;
import io.fluxcapacitor.testserver.websocket.Handle;
import io.fluxcapacitor.testserver.websocket.WebsocketEndpoint;
import jakarta.websocket.CloseReason;
import jakarta.websocket.Session;
import java.beans.ConstructorProperties;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerEndpoint
extends WebsocketEndpoint {
    private static final Logger log = LoggerFactory.getLogger(ConsumerEndpoint.class);
    private final TrackingStrategy trackingStrategy;
    private final MessageStore messageStore;
    private final PositionStore positionStore;
    private final MessageType messageType;

    public ConsumerEndpoint(MessageStore messageStore, MessageType messageType) {
        this(new DefaultTrackingStrategy(messageStore), messageStore, new InMemoryPositionStore(), messageType);
    }

    @Handle
    void handle(Read read, Session session) {
        this.trackingStrategy.getBatch(new WebSocketTracker(read, this.messageType, this.getClientId(session), session.getId(), (batch, p) -> this.doSendResult(session, new ReadResult(read.getRequestId(), (MessageBatch)batch))), this.positionStore);
    }

    @Handle
    void handle(ClaimSegment read, Session session) {
        this.trackingStrategy.claimSegment(new WebSocketTracker(read, this.messageType, this.getClientId(session), session.getId(), (batch, p) -> this.doSendResult(session, new ClaimSegmentResult(read.getRequestId(), Optional.ofNullable(p).orElseGet(Position::newPosition), batch.getSegment()))), this.positionStore);
    }

    @Handle
    CompletableFuture<Void> handle(StorePosition storePosition) {
        return this.positionStore.storePosition(storePosition.getConsumer(), storePosition.getSegment(), storePosition.getLastIndex());
    }

    @Handle
    CompletableFuture<Void> handle(ResetPosition resetPosition) {
        return this.positionStore.resetPosition(resetPosition.getConsumer(), resetPosition.getLastIndex());
    }

    @Handle
    void handle(DisconnectTracker disconnectTracker) {
        this.trackingStrategy.disconnectTrackers(t -> Objects.equals(t.getConsumerName(), disconnectTracker.getConsumer()) && Objects.equals(t.getTrackerId(), disconnectTracker.getTrackerId()), disconnectTracker.isSendFinalEmptyBatch());
    }

    @Handle
    ReadFromIndexResult handle(ReadFromIndex read) {
        return new ReadFromIndexResult(read.getRequestId(), this.messageStore.getBatch(read.getMinIndex(), read.getMaxSize(), true));
    }

    @Handle
    GetPositionResult handle(GetPosition read) {
        return new GetPositionResult(read.getRequestId(), this.positionStore.position(read.getConsumer()));
    }

    @Override
    public void onClose(Session session, CloseReason closeReason) {
        this.trackingStrategy.disconnectTrackers(t -> t instanceof WebSocketTracker && ((WebSocketTracker)t).getSessionId().equals(session.getId()), false);
        super.onClose(session, closeReason);
    }

    @Override
    protected void shutDown() {
        this.trackingStrategy.disconnectTrackers(t -> true, false);
        super.shutDown();
    }

    public String toString() {
        return "ConsumerEndpoint{logType='" + String.valueOf((Object)this.messageType) + "'}";
    }

    @ConstructorProperties(value={"trackingStrategy", "messageStore", "positionStore", "messageType"})
    public ConsumerEndpoint(TrackingStrategy trackingStrategy, MessageStore messageStore, PositionStore positionStore, MessageType messageType) {
        this.trackingStrategy = trackingStrategy;
        this.messageStore = messageStore;
        this.positionStore = positionStore;
        this.messageType = messageType;
    }
}

