/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.impl.AssertUtils;
import io.axoniq.axonserver.grpc.FlowControl;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FlowControlledStream<IN, OUT>
implements ClientResponseObserver<OUT, IN> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final AtomicInteger permitsConsumed = new AtomicInteger();
    private final String clientId;
    private final int permits;
    private final int refillBatch;
    private final FlowControl flowControl;
    private ClientCallStreamObserver<OUT> outboundStream;

    protected FlowControlledStream(String clientId, int permits, int refillBatch) {
        AssertUtils.assertParameter(permits > 0, "Permits must be > 0");
        AssertUtils.assertParameter(refillBatch <= permits, "The refillBatch must be smaller than the number of permits");
        AssertUtils.assertParameter(clientId != null, "The clientId must not be null");
        this.clientId = clientId;
        this.permits = permits;
        this.refillBatch = refillBatch;
        this.flowControl = FlowControl.newBuilder().setPermits(refillBatch).setClientId(clientId).build();
    }

    public void enableFlowControl() {
        if (this.refillBatch > 0) {
            this.permitsConsumed.set(0);
            OUT out = this.buildInitialFlowControlMessage(FlowControl.newBuilder().setPermits(this.permits).setClientId(this.clientId).build());
            if (out != null) {
                this.outboundStream().onNext(out);
            }
        }
    }

    protected abstract OUT buildFlowControlMessage(FlowControl var1);

    protected OUT buildInitialFlowControlMessage(FlowControl flowControl) {
        return this.buildFlowControlMessage(flowControl);
    }

    protected void markConsumed() {
        int ticker;
        if (this.refillBatch > 0 && (ticker = this.permitsConsumed.updateAndGet(current -> {
            if (current == this.refillBatch - 1) {
                return 0;
            }
            return current + 1;
        })) == 0) {
            OUT permitsRequest = this.buildFlowControlMessage(this.flowControl);
            logger.debug("Requesting additional {} permits", (Object)this.refillBatch);
            this.outboundStream.request(this.refillBatch);
            if (permitsRequest != null) {
                this.outboundStream().onNext(permitsRequest);
            }
        }
    }

    public void beforeStart(ClientCallStreamObserver<OUT> requestStream) {
        if (this.refillBatch > 0) {
            requestStream.disableAutoRequestWithInitial(this.permits);
        }
        this.outboundStream = requestStream;
    }

    protected ClientCallStreamObserver<OUT> outboundStream() {
        return this.outboundStream;
    }
}

