/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.companion;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;

public abstract class KafkaTask<T, SELF extends KafkaTask<T, SELF>>
implements Iterable<T>,
AutoCloseable {
    private final Multi<T> multi;
    private final AssertSubscriber<T> subscriber;

    public KafkaTask(Multi<T> multi) {
        this.multi = multi;
        this.subscriber = AssertSubscriber.create((long)Long.MAX_VALUE);
        this.multi.subscribe(this.subscriber);
    }

    public Multi<T> getMulti() {
        return this.multi;
    }

    @Override
    public Iterator<T> iterator() {
        return this.getRecords().iterator();
    }

    @Override
    public Spliterator<T> spliterator() {
        return this.getRecords().spliterator();
    }

    protected SELF self() {
        return (SELF)this;
    }

    public long count() {
        return this.subscriber.getItems().size();
    }

    public T getFirstRecord() {
        List records = this.subscriber.getItems();
        if (records.isEmpty()) {
            return null;
        }
        return (T)records.get(0);
    }

    public T getLastRecord() {
        return (T)this.subscriber.getLastItem();
    }

    public List<T> getRecords() {
        return this.subscriber.getItems();
    }

    private void throwFailureForCause(Runnable runnable) {
        try {
            runnable.run();
        }
        catch (AssertionError e) {
            if (this.subscriber.getFailure() != null) {
                throw new AssertionError(((Throwable)((Object)e)).getMessage(), (Throwable)((Object)e));
            }
            throw e;
        }
    }

    public SELF awaitNextRecord() {
        return this.awaitNextRecord(AssertSubscriber.DEFAULT_TIMEOUT);
    }

    public SELF awaitNextRecord(Duration duration) {
        this.throwFailureForCause(() -> this.subscriber.awaitNextItem(duration));
        return this.self();
    }

    public SELF awaitNextRecords(int number) {
        return this.awaitNextRecords(number, AssertSubscriber.DEFAULT_TIMEOUT);
    }

    public SELF awaitNextRecords(int number, Duration duration) {
        this.throwFailureForCause(() -> this.subscriber.awaitNextItems(number, duration));
        return this.self();
    }

    public SELF awaitRecords(int number) {
        return this.awaitRecords(number, AssertSubscriber.DEFAULT_TIMEOUT);
    }

    public SELF awaitRecords(int number, Duration duration) {
        this.throwFailureForCause(() -> this.subscriber.awaitItems(number, duration));
        return this.self();
    }

    public SELF awaitNoRecords(Duration duration) {
        try {
            this.awaitNextRecord(duration);
            throw new AssertionError((Object)"Received a record while expecting no records.");
        }
        catch (AssertionError e) {
            int size = this.subscriber.getItems().size();
            if (size != 0) {
                throw new AssertionError((Object)("Received " + size + " record(s) while expecting no records."));
            }
            return this.self();
        }
    }

    public SELF awaitCompletion() {
        this.subscriber.awaitCompletion();
        return this.self();
    }

    public SELF awaitCompletion(Duration duration) {
        this.subscriber.awaitCompletion(duration);
        return this.self();
    }

    public SELF awaitCompletion(BiConsumer<Throwable, Boolean> assertion) {
        return this.awaitCompletion(assertion, AssertSubscriber.DEFAULT_TIMEOUT);
    }

    public SELF awaitCompletion(BiConsumer<Throwable, Boolean> assertion, Duration duration) {
        try {
            this.subscriber.awaitCompletion(duration);
        }
        catch (AssertionError maybe) {
            this.subscriber.assertTerminated();
        }
        try {
            assertion.accept(this.subscriber.getFailure(), this.subscriber.isCancelled());
            return this.self();
        }
        catch (AssertionError e) {
            throw new AssertionError("Received a failure or cancellation event, but did not pass the validation: " + e, (Throwable)((Object)e));
        }
    }

    public SELF stop() {
        this.subscriber.cancel();
        this.subscriber.onComplete();
        return this.self();
    }

    @Override
    public void close() {
        this.stop();
    }

    public long firstOffset() {
        T firstRecord = this.getFirstRecord();
        if (firstRecord == null) {
            return -1L;
        }
        return this.offset(firstRecord);
    }

    public long lastOffset() {
        T lastRecord = this.getLastRecord();
        if (lastRecord == null) {
            return -1L;
        }
        return this.offset(lastRecord);
    }

    public Map<TopicPartition, List<T>> byTopicPartition() {
        return this.getRecords().stream().collect(Collectors.groupingBy(this::topicPartition));
    }

    protected abstract long offset(T var1);

    protected abstract TopicPartition topicPartition(T var1);
}

