/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.containers.engine;

import io.camunda.zeebe.process.test.api.RecordStreamSource;
import io.camunda.zeebe.protocol.record.Record;
import io.zeebe.containers.ZeebeBrokerNode;
import io.zeebe.containers.engine.InfiniteList;
import io.zeebe.containers.exporter.DebugReceiver;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;
import net.jcip.annotations.ThreadSafe;
import org.agrona.collections.MutableInteger;
import org.agrona.collections.MutableLong;
import org.apiguardian.api.API;

@API(status=API.Status.INTERNAL)
@ThreadSafe
final class DebugReceiverStream
implements RecordStreamSource,
AutoCloseable {
    private final InfiniteList<Record<?>> records;
    private final DebugReceiver receiver;
    private final Duration idlePeriod;

    DebugReceiverStream(InfiniteList<Record<?>> records) {
        this(records, new DebugReceiver(records::add));
    }

    DebugReceiverStream(InfiniteList<Record<?>> records, Duration idlePeriod) {
        this(records, new DebugReceiver(records::add), idlePeriod);
    }

    DebugReceiverStream(InfiniteList<Record<?>> records, DebugReceiver receiver) {
        this(records, receiver, Duration.ofSeconds(1L));
    }

    DebugReceiverStream(InfiniteList<Record<?>> records, DebugReceiver receiver, Duration idlePeriod) {
        this.records = records;
        this.receiver = receiver;
        this.idlePeriod = idlePeriod;
    }

    void start(Collection<? extends ZeebeBrokerNode<?>> brokers) {
        this.receiver.start();
        int port = this.receiver.serverAddress().getPort();
        brokers.forEach(broker -> broker.withDebugExporter(port));
    }

    void stop() {
        this.receiver.stop();
    }

    void acknowledge(int partitionId, long position) {
        this.receiver.acknowledge(partitionId, position);
    }

    @Override
    public void close() {
        this.stop();
    }

    public Iterable<Record<?>> getRecords() {
        return this.records;
    }

    void waitForIdleState(Duration timeout) throws InterruptedException, TimeoutException {
        MutableInteger recordsCount = new MutableInteger(this.records.size());
        MutableLong lastInvoked = new MutableLong(System.nanoTime());
        MutableLong conditionHeld = new MutableLong(0L);
        Duration pollingInterval = Duration.ofMillis(100L);
        long mustHold = this.idlePeriod.toNanos();
        this.awaitConditionHolds(timeout, pollingInterval, "until no records are exported for 1 second", () -> {
            int count = recordsCount.get();
            recordsCount.set(this.records.size());
            long currentNano = System.nanoTime();
            long timeElapsed = currentNano - lastInvoked.get();
            lastInvoked.set(currentNano);
            if (count == recordsCount.get()) {
                conditionHeld.set(conditionHeld.get() + timeElapsed);
            } else {
                conditionHeld.set(0L);
            }
            return conditionHeld.get() >= mustHold;
        });
    }

    void waitForBusyState(Duration timeout) throws InterruptedException, TimeoutException {
        MutableInteger recordsCount = new MutableInteger(this.records.size());
        this.awaitConditionHolds(timeout, Duration.ofMillis(100L), "until a record is exported", () -> {
            int count = recordsCount.get();
            recordsCount.set(this.records.size());
            return count != recordsCount.get();
        });
    }

    private void awaitConditionHolds(Duration timeout, Duration pollInterval, String description, BooleanSupplier condition) throws TimeoutException, InterruptedException {
        boolean timedOut;
        Thread current = Thread.currentThread();
        long timeoutNs = System.nanoTime() + timeout.toNanos();
        while (!current.isInterrupted() && System.nanoTime() < timeoutNs && !condition.getAsBoolean()) {
            Thread.sleep(pollInterval.toMillis());
        }
        boolean bl = timedOut = System.nanoTime() >= timeoutNs;
        if (timedOut) {
            throw new TimeoutException("Timed out waiting " + description);
        }
    }
}

