/*
 * Decompiled with CFR 0.152.
 */
package io.github.tsegismont.streamutils.impl;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;
import java.util.Objects;
import java.util.function.Predicate;

public final class FilteringStream<T>
implements ReadStream<T> {
    private final ReadStream<T> source;
    private final Predicate<T> predicate;
    private final Context context;
    private final InboundBuffer<T> queue;
    private boolean ended;
    private Throwable error;
    private boolean stopped;
    private int inFlight;
    private Handler<T> handler;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;

    public FilteringStream(ReadStream<T> source, Predicate<T> predicate) {
        this(source, predicate, Vertx.currentContext());
    }

    public FilteringStream(ReadStream<T> source, Predicate<T> predicate, Vertx vertx) {
        this(source, predicate, vertx.getOrCreateContext());
    }

    public FilteringStream(ReadStream<T> source, Predicate<T> predicate, Context context) {
        Objects.requireNonNull(source, "Source cannot be null");
        Objects.requireNonNull(predicate, "Filtering function cannot be null");
        Objects.requireNonNull(context, "Context cannot be null");
        this.source = source;
        this.predicate = predicate;
        this.context = context;
        this.queue = new InboundBuffer(context).exceptionHandler(throwable -> this.notifyTerminalHandler(this.getExceptionHandler(), throwable)).drainHandler(v -> source.resume());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ReadStream<T> exceptionHandler(Handler<Throwable> handler) {
        InboundBuffer<T> inboundBuffer = this.queue;
        synchronized (inboundBuffer) {
            if (!this.stopped) {
                this.exceptionHandler = handler;
            }
        }
        return this;
    }

    private synchronized Handler<Throwable> getExceptionHandler() {
        return this.exceptionHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ReadStream<T> handler(Handler<T> handler) {
        InboundBuffer<T> inboundBuffer = this.queue;
        synchronized (inboundBuffer) {
            if (this.stopped) {
                return this;
            }
        }
        if (handler == null) {
            this.notifyTerminalHandler(this.getEndHandler(), null);
            return this;
        }
        this.handler = handler;
        this.queue.handler(item -> this.handleOnContext(this::emit, item));
        this.source.handler(item -> this.handleOnContext(this::filter, item)).exceptionHandler(throwable -> this.handleOnContext(this::error, throwable)).endHandler(v -> this.handleOnContext(this::exhausted, null));
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void error(Throwable t) {
        boolean terminate;
        InboundBuffer<T> inboundBuffer = this.queue;
        synchronized (inboundBuffer) {
            this.error = t;
            terminate = !this.stopped && this.inFlight == 0;
        }
        if (terminate) {
            this.notifyTerminalHandler(this.getExceptionHandler(), t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void exhausted(T item) {
        boolean terminate;
        InboundBuffer<T> inboundBuffer = this.queue;
        synchronized (inboundBuffer) {
            this.ended = true;
            terminate = !this.stopped && this.inFlight == 0;
        }
        if (terminate) {
            this.notifyTerminalHandler(this.getEndHandler(), null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void filter(T item) {
        if (this.predicate.test(item)) {
            InboundBuffer<T> inboundBuffer = this.queue;
            synchronized (inboundBuffer) {
                ++this.inFlight;
            }
            if (!this.queue.write(item)) {
                this.source.pause();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emit(T item) {
        Handler<T> h;
        int terminate = 0;
        InboundBuffer<T> inboundBuffer = this.queue;
        synchronized (inboundBuffer) {
            --this.inFlight;
            Handler<T> handler = h = this.stopped ? null : this.handler;
            terminate = this.stopped || this.inFlight > 0 ? 0 : (this.error != null ? 2 : (this.ended ? 1 : 0));
        }
        if (h != null) {
            h.handle(item);
        }
        if (terminate == 1) {
            this.notifyTerminalHandler(this.getEndHandler(), null);
        } else if (terminate == 2) {
            this.notifyTerminalHandler(this.getExceptionHandler(), this.error);
        }
    }

    private <U> void handleOnContext(Handler<U> handler, U value) {
        if (this.context == Vertx.currentContext()) {
            handler.handle(value);
        } else {
            this.context.runOnContext(v -> handler.handle(value));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ReadStream<T> pause() {
        InboundBuffer<T> inboundBuffer = this.queue;
        synchronized (inboundBuffer) {
            if (!this.stopped) {
                this.queue.pause();
            }
        }
        return this;
    }

    public ReadStream<T> resume() {
        return this.fetch(Long.MAX_VALUE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ReadStream<T> fetch(long amount) {
        InboundBuffer<T> inboundBuffer = this.queue;
        synchronized (inboundBuffer) {
            if (!this.stopped) {
                this.queue.fetch(amount);
            }
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ReadStream<T> endHandler(Handler<Void> endHandler) {
        InboundBuffer<T> inboundBuffer = this.queue;
        synchronized (inboundBuffer) {
            if (!this.stopped) {
                this.endHandler = endHandler;
            }
        }
        return this;
    }

    private synchronized Handler<Void> getEndHandler() {
        return this.endHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <V> void notifyTerminalHandler(Handler<V> handler, V value) {
        Handler h;
        InboundBuffer<T> inboundBuffer = this.queue;
        synchronized (inboundBuffer) {
            if (!this.stopped) {
                this.stopped = true;
                this.queue.handler(null).drainHandler(null);
                h = handler;
            } else {
                h = null;
            }
        }
        if (h != null) {
            if (this.context != Vertx.currentContext()) {
                this.context.runOnContext(v -> h.handle(value));
            } else {
                h.handle(value);
            }
        }
    }
}

