/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.companion;

import io.smallrye.mutiny.subscription.MultiSubscriber;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Spliterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;

class RecordsSubscriber<T, SELF extends RecordsSubscriber<T, SELF>>
implements MultiSubscriber<T>,
Iterable<T> {
    public static Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10L);
    private final CountDownLatch terminal = new CountDownLatch(1);
    private final AtomicReference<Subscription> subscription = new AtomicReference();
    private final AtomicLong requested = new AtomicLong();
    private final List<T> records = new CopyOnWriteArrayList<T>();
    private final AtomicLong received = new AtomicLong();
    private final AtomicReference<T> lastReceived = new AtomicReference();
    private final AtomicReference<T> firstReceived = new AtomicReference();
    private final AtomicReference<Throwable> failure = new AtomicReference();
    private final AtomicBoolean completed = new AtomicBoolean();
    private boolean cancelled;
    private final List<EventListener> eventListeners = new CopyOnWriteArrayList<EventListener>();

    public RecordsSubscriber(long requested) {
        this.requested.set(requested);
    }

    public RecordsSubscriber() {
        this(0L);
    }

    protected SELF self() {
        return (SELF)this;
    }

    public long count() {
        return this.received.get();
    }

    public T getFirstRecord() {
        return this.firstReceived.get();
    }

    public T getLastRecord() {
        return this.lastReceived.get();
    }

    public List<T> getRecords() {
        return this.records;
    }

    public SELF awaitNextRecord() {
        return this.awaitNextRecords(1);
    }

    public SELF awaitNextRecord(Duration duration) {
        return this.awaitNextRecords(1, 1, duration);
    }

    public SELF awaitNextRecords(int number) {
        return this.awaitNextRecords(number, DEFAULT_TIMEOUT);
    }

    public SELF awaitNextRecords(int number, int request) {
        return this.awaitNextRecords(number, request, DEFAULT_TIMEOUT);
    }

    public SELF awaitNextRecords(int number, Duration duration) {
        return this.awaitNextRecords(number, number, duration);
    }

    public SELF awaitNextRecords(int number, int request, Duration duration) {
        if (this.hasCompleted() || this.getFailure() != null) {
            if (this.hasCompleted()) {
                throw new AssertionError((Object)"Expecting a next records, but a completion event has already being received");
            }
            throw new AssertionError((Object)("Expecting a next records, but a failure event has already being received: " + this.getFailure()));
        }
        this.awaitNextRecordEvents(number, request, duration);
        return this.self();
    }

    public SELF awaitRecords(int number) {
        return this.awaitRecords(number, DEFAULT_TIMEOUT);
    }

    public SELF awaitRecords(int number, Duration duration) {
        long receivedCount = this.count();
        if (receivedCount > (long)number) {
            throw new AssertionError((Object)("Expected the number of records to be " + number + ", but it's already " + receivedCount));
        }
        if (this.isCancelled() || this.hasCompleted() || this.getFailure() != null) {
            if (receivedCount != (long)number) {
                throw new AssertionError((Object)("Expected the number of records to be " + number + ", but received " + receivedCount + " and we received a terminal event already"));
            }
            return this.self();
        }
        this.awaitRecordEvents(number, duration);
        return this.self();
    }

    public SELF awaitCompletion() {
        return this.awaitCompletion(DEFAULT_TIMEOUT);
    }

    public SELF awaitCompletion(Duration duration) {
        return this.awaitCompletion((t, c) -> {}, duration);
    }

    public SELF awaitCompletion(BiConsumer<Throwable, Boolean> assertion) {
        return this.awaitCompletion(assertion, DEFAULT_TIMEOUT);
    }

    public SELF awaitCompletion(BiConsumer<Throwable, Boolean> assertion, Duration duration) {
        try {
            this.awaitEvent(this.terminal, duration);
        }
        catch (TimeoutException e) {
            throw new AssertionError((Object)("No completion event received in the last " + duration.toMillis() + " ms"));
        }
        Throwable throwable = this.failure.get();
        try {
            assertion.accept(throwable, this.cancelled);
            return this.self();
        }
        catch (AssertionError e) {
            throw new AssertionError("Received a failure or cancellation event, but did not pass the validation: " + e, (Throwable)((Object)e));
        }
    }

    private void awaitEvent(CountDownLatch latch, Duration duration) throws TimeoutException {
        if (latch.getCount() == 0L) {
            return;
        }
        try {
            if (!latch.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
                throw new TimeoutException();
            }
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

    private void awaitNextRecordEvents(int number, int request, Duration duration) {
        long currentCount = this.count();
        RecordTask task = new RecordTask(this, currentCount + (long)number, this.self());
        if (request > 0) {
            this.request(request);
        }
        try {
            task.future().get(duration.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            long received = this.count() - currentCount;
            if (this.isCancelled()) {
                throw new AssertionError((Object)("Expected " + number + " records, but received a cancellation event while waiting. Only " + received + " record(s) have been received."));
            }
            if (this.hasCompleted()) {
                throw new AssertionError((Object)("Expected " + number + " records, but received a completion event while waiting. Only " + received + " record(s) have been received."));
            }
            throw new AssertionError((Object)("Expected " + number + " records, but received a failure event while waiting: " + this.getFailure() + ". Only " + received + " record(s) have been received."));
        }
        catch (TimeoutException e) {
            long received = this.count() - currentCount;
            throw new AssertionError((Object)("Expected " + number + " records in " + duration.toMillis() + " ms, but only received " + received + " records."));
        }
    }

    private void awaitRecordEvents(int expected, Duration duration) {
        RecordTask task = new RecordTask(this, (long)expected, this.self());
        try {
            task.future().get(duration.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            long receivedCount = this.count();
            if ((long)expected == receivedCount) {
                return;
            }
            if (this.isCancelled()) {
                throw new AssertionError((Object)("Expected " + expected + " records, but received a cancellation event while waiting. Only " + receivedCount + " records have been received."));
            }
            if (this.hasCompleted()) {
                throw new AssertionError((Object)("Expected " + expected + " records, but received a completion event while waiting. Only " + receivedCount + " records have been received."));
            }
            if (this.getFailure() != null) {
                throw new AssertionError((Object)("Expected " + expected + " records, but received a failure event while waiting: " + this.getFailure() + ". Only " + receivedCount + " records have been received."));
            }
            throw new AssertionError((Object)("Expected " + expected + " records.  Only " + receivedCount + " records have been received."));
        }
        catch (TimeoutException e) {
            long receivedCount = this.count();
            if (receivedCount >= (long)expected) {
                return;
            }
            throw new AssertionError((Object)("Expected " + expected + " records.  Only " + receivedCount + " records have been received."));
        }
    }

    public SELF cancel() {
        if (this.subscription.get() != null) {
            this.subscription.get().cancel();
        }
        this.cancelled = true;
        this.terminal.countDown();
        Event ev = new Event(null, null, false, true);
        this.eventListeners.forEach(l -> l.accept(ev));
        return this.self();
    }

    private SELF request(long req) {
        this.requested.addAndGet(req);
        if (this.subscription.get() != null) {
            this.subscription.get().request(req);
        }
        return this.self();
    }

    public void onSubscribe(Subscription s) {
        if (this.subscription.compareAndSet(null, s) && this.requested.get() > 0L) {
            s.request(this.requested.get());
        }
    }

    public synchronized void onItem(T t) {
        long newCount = this.received.incrementAndGet();
        if (newCount == 1L) {
            this.firstReceived.set(t);
        }
        this.received(t);
        this.lastReceived.set(t);
        Event ev = new Event(newCount, null, false, false);
        this.eventListeners.forEach(l -> l.accept(ev));
    }

    public void received(T t) {
        this.records.add(t);
    }

    public void onFailure(Throwable t) {
        this.failure.set(t);
        this.terminal.countDown();
        Event ev = new Event(null, t, false, false);
        this.eventListeners.forEach(l -> l.accept(ev));
    }

    public void onCompletion() {
        this.completed.set(true);
        this.terminal.countDown();
        Event ev = new Event(null, null, true, false);
        this.eventListeners.forEach(l -> l.accept(ev));
    }

    @Override
    public Iterator<T> iterator() {
        return this.getRecords().iterator();
    }

    @Override
    public Spliterator<T> spliterator() {
        return this.getRecords().spliterator();
    }

    public Throwable getFailure() {
        return this.failure.get();
    }

    public boolean isCancelled() {
        return this.cancelled;
    }

    public boolean hasCompleted() {
        return this.completed.get();
    }

    public boolean hasSubscribed() {
        return this.subscription.get() != null;
    }

    protected void registerListener(EventListener listener) {
        this.eventListeners.add(listener);
    }

    protected void unregisterListener(EventListener listener) {
        this.eventListeners.remove(listener);
    }

    private class RecordTask {
        private final long targetCount;
        private final SELF subscriber;
        private final long startCount;
        final /* synthetic */ RecordsSubscriber this$0;

        /*
         * WARNING - Possible parameter corruption
         * WARNING - void declaration
         */
        public RecordTask(long subscriber, SELF SELF) {
            void targetCount;
            this.this$0 = (RecordsSubscriber)l;
            this.targetCount = targetCount;
            this.subscriber = subscriber;
            this.startCount = subscriber.count();
        }

        public CompletableFuture<Void> future() {
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            if (this.startCount >= this.targetCount) {
                future.complete(null);
                return future;
            }
            EventListener listener = event -> {
                if (event.recordCount() != null && ((Event)event).recordCount >= this.targetCount) {
                    future.complete(null);
                } else if (event.isCancellation() || event.isFailure() || event.isCompletion()) {
                    future.completeExceptionally(new NoSuchElementException("Received a terminal event while waiting for records"));
                }
            };
            ((RecordsSubscriber)this.subscriber).registerListener(listener);
            return future.whenComplete((x, f) -> ((RecordsSubscriber)this.subscriber).unregisterListener(listener));
        }
    }

    private static class Event {
        private final Long recordCount;
        private final Throwable failure;
        private final boolean completion;
        private final boolean cancellation;

        private Event(Long recordCount, Throwable failure, boolean completion, boolean cancellation) {
            this.recordCount = recordCount;
            this.failure = failure;
            this.completion = completion;
            this.cancellation = cancellation;
        }

        public Long recordCount() {
            return this.recordCount;
        }

        public boolean isRecord() {
            return this.recordCount != null;
        }

        public boolean isCancellation() {
            return this.cancellation;
        }

        public boolean isFailure() {
            return this.failure != null;
        }

        public boolean isCompletion() {
            return this.completion;
        }
    }

    private static interface EventListener
    extends Consumer<Event> {
    }
}

