/*
 * Decompiled with CFR 0.152.
 */
package io.github.tsegismont.streamutils.impl;

import io.vertx.core.Handler;
import io.vertx.core.impl.Arguments;
import io.vertx.core.streams.ReadStream;
import java.util.Objects;

public final class LimitingStream<T>
implements ReadStream<T> {
    private final ReadStream<T> source;
    private final long limit;
    private long received;
    private boolean stopped;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;

    public LimitingStream(ReadStream<T> source, long limit) {
        Objects.requireNonNull(source, "Source cannot be null");
        Arguments.require((limit >= 0L ? 1 : 0) != 0, (String)"Limit must be positive");
        this.source = source;
        this.limit = limit;
    }

    public synchronized ReadStream<T> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    private synchronized Handler<Throwable> getExceptionHandler() {
        return this.exceptionHandler;
    }

    public ReadStream<T> handler(Handler<T> handler) {
        if (handler == null) {
            this.source.handler(null);
            return this;
        }
        this.source.exceptionHandler(throwable -> this.notifyTerminalHandler(this.getExceptionHandler(), throwable)).endHandler(v -> this.notifyTerminalHandler(this.getEndHandler(), null)).handler(item -> {
            boolean terminate;
            boolean emit;
            LimitingStream limitingStream = this;
            synchronized (limitingStream) {
                ++this.received;
                emit = !this.stopped && this.received <= this.limit;
                terminate = !this.stopped && (this.received == 1L && this.limit == 0L || this.received == this.limit);
            }
            if (emit) {
                handler.handle(item);
            }
            if (terminate) {
                this.notifyTerminalHandler(this.getEndHandler(), null);
            }
        });
        return this;
    }

    public ReadStream<T> pause() {
        this.source.pause();
        return this;
    }

    public ReadStream<T> resume() {
        return this.fetch(Long.MAX_VALUE);
    }

    public ReadStream<T> fetch(long l) {
        this.source.fetch(l);
        return this;
    }

    public synchronized ReadStream<T> endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    private synchronized Handler<Void> getEndHandler() {
        return this.endHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <V> void notifyTerminalHandler(Handler<V> handler, V value) {
        Handler<V> h;
        LimitingStream limitingStream = this;
        synchronized (limitingStream) {
            if (!this.stopped) {
                this.stopped = true;
                this.source.handler(null).exceptionHandler(null).endHandler(null);
                h = handler;
            } else {
                h = null;
            }
        }
        if (h != null) {
            h.handle(value);
        }
    }
}

