/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.event.impl;

import io.axoniq.axonserver.connector.event.PersistentStreamSegment;
import io.axoniq.axonserver.connector.impl.AbstractBufferedStream;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.event.EventWithToken;
import io.axoniq.axonserver.grpc.streams.PersistentStreamEvent;
import io.axoniq.axonserver.grpc.streams.Requests;
import io.axoniq.axonserver.grpc.streams.StreamRequest;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferedPersistentStreamSegment
extends AbstractBufferedStream<PersistentStreamEvent, StreamRequest>
implements PersistentStreamSegment {
    private static final Logger logger = LoggerFactory.getLogger(BufferedPersistentStreamSegment.class);
    private static final PersistentStreamEvent TERMINAL_MESSAGE = PersistentStreamEvent.newBuilder().setEvent(EventWithToken.newBuilder().setToken(-1729L).build()).build();
    private final Set<Runnable> onSegmentClosedCallbacks = new CopyOnWriteArraySet<Runnable>();
    private final String streamId;
    private final int segment;
    private final LongConsumer progressCallback;
    private final Consumer<String> errorCallback;
    private final AtomicBoolean closed = new AtomicBoolean();
    private Runnable localOnAvailableCallback = () -> {};

    public BufferedPersistentStreamSegment(String streamId, int segment, int bufferSize, int refillBatch, LongConsumer progressCallback, Consumer<String> errorCallback) {
        super("ignoredClientId", bufferSize, refillBatch);
        this.streamId = streamId;
        this.segment = segment;
        this.progressCallback = progressCallback;
        this.errorCallback = errorCallback;
    }

    @Override
    public void onSegmentClosed(Runnable callback) {
        this.onSegmentClosedCallbacks.add(callback);
    }

    @Override
    public void onCompleted() {
        super.onCompleted();
        this.closed.set(true);
        this.onSegmentClosedCallbacks.forEach(Runnable::run);
    }

    @Override
    public void acknowledge(long token) {
        if (this.closed.get()) {
            logger.debug("{}: Acknowledging position {} for segment {} after closing the segment", new Object[]{this.streamId, token, this.segment});
        }
        this.progressCallback.accept(token);
    }

    @Override
    public void error(String error) {
        this.errorCallback.accept(error);
    }

    @Override
    public boolean isClosed() {
        return this.closed.get();
    }

    @Override
    public int segment() {
        return this.segment;
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            logger.info("{}: Close segment {}", (Object)this.streamId, (Object)this.segment);
            this.localOnAvailableCallback.run();
        }
    }

    @Override
    public void onAvailable(Runnable callback) {
        super.onAvailable(callback);
        this.localOnAvailableCallback = callback;
    }

    @Override
    protected PersistentStreamEvent terminalMessage() {
        return TERMINAL_MESSAGE;
    }

    @Override
    protected StreamRequest buildFlowControlMessage(FlowControl flowControl) {
        return StreamRequest.newBuilder().setRequests(Requests.newBuilder().setSegment(this.segment).setRequests((int)flowControl.getPermits())).build();
    }

    public String toString() {
        return this.streamId + "[" + this.segment + "]";
    }
}

