/*
 * Decompiled with CFR 0.152.
 */
package com.landawn.abacus.util;

import com.landawn.abacus.util.AndroidUtil;
import com.landawn.abacus.util.IOUtil;
import com.landawn.abacus.util.N;
import com.landawn.abacus.util.ObjIterator;
import com.landawn.abacus.util.Timed;
import com.landawn.abacus.util.function.Consumer;
import com.landawn.abacus.util.function.Function;
import com.landawn.abacus.util.function.Predicate;
import com.landawn.abacus.util.u;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public abstract class Observer<T> {
    private static final Object COMPLETE_FLAG = new Object();
    protected static final double INTERVAL_FACTOR = 3.0;
    protected static final Runnable EMPTY_ACTION = new Runnable(){

        @Override
        public void run() {
        }
    };
    protected static final Consumer<Exception> ON_ERROR_MISSING = new Consumer<Exception>(){

        @Override
        public void accept(Exception t) {
            throw new RuntimeException(t);
        }
    };
    protected static final Executor asyncExecutor = IOUtil.IS_PLATFORM_ANDROID ? AndroidUtil.getThreadPoolExecutor() : new ThreadPoolExecutor(Math.max(8, IOUtil.CPU_CORES), Math.max(64, IOUtil.CPU_CORES), 180L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    protected static final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(IOUtil.IS_PLATFORM_ANDROID ? IOUtil.CPU_CORES : 32);
    protected final Map<ScheduledFuture<?>, Long> scheduledFutures = new LinkedHashMap();
    protected final Dispatcher<Object> dispatcher;
    protected boolean hasMore = true;

    protected Observer() {
        this(new Dispatcher<Object>());
    }

    protected Observer(Dispatcher<Object> dispatcher) {
        this.dispatcher = dispatcher;
    }

    public static void complete(BlockingQueue<?> queue) {
        queue.offer(COMPLETE_FLAG);
    }

    public static <T> Observer<T> of(BlockingQueue<T> queue) {
        N.checkArgNotNull(queue, "queue");
        return new BlockingQueueObserver<T>(queue);
    }

    public static <T> Observer<T> of(Collection<T> c) {
        return Observer.of(N.isNullOrEmpty(c) ? ObjIterator.empty() : c.iterator());
    }

    public static <T> Observer<T> of(Iterator<T> iter) {
        N.checkArgNotNull(iter, "iterator");
        return new IteratorObserver<T>(iter);
    }

    public static Observer<Long> timer(long delayInMillis) {
        return Observer.timer(delayInMillis, TimeUnit.MILLISECONDS);
    }

    public static Observer<Long> timer(long delay, TimeUnit unit) {
        N.checkArgument(delay >= 0L, "delay can't be negative");
        N.checkArgNotNull(unit, "Time unit can't be null");
        return new TimerObserver<Long>(delay, unit);
    }

    public static Observer<Long> interval(long periodInMillis) {
        return Observer.interval(0L, periodInMillis, TimeUnit.MILLISECONDS);
    }

    public static Observer<Long> interval(long initialDelayInMillis, long periodInMillis) {
        return Observer.interval(initialDelayInMillis, periodInMillis, TimeUnit.MILLISECONDS);
    }

    public static Observer<Long> interval(long period, TimeUnit unit) {
        return Observer.interval(0L, period, unit);
    }

    public static Observer<Long> interval(long initialDelay, long period, TimeUnit unit) {
        N.checkArgument(initialDelay >= 0L, "initialDelay can't be negative");
        N.checkArgument(period > 0L, "period can't be 0 or negative");
        N.checkArgNotNull(unit, "Time unit can't be null");
        return new IntervalObserver<Long>(initialDelay, period, unit);
    }

    public Observer<T> debounce(long intervalDurationInMillis) {
        return this.debounce(intervalDurationInMillis, TimeUnit.MILLISECONDS);
    }

    public Observer<T> debounce(final long intervalDuration, final TimeUnit unit) {
        N.checkArgument(intervalDuration >= 0L, "Interval can't be negative");
        N.checkArgNotNull(unit, "Time unit can't be null");
        if (intervalDuration == 0L) {
            return this;
        }
        final long intervalDurationInMillis = unit.toMillis(intervalDuration);
        this.dispatcher.append(new Dispatcher<Object>(){
            private long prevTimestamp = 0L;
            private long lastScheduledTime = 0L;

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onNext(Object param) {
                u.Holder holder = this.holder;
                synchronized (holder) {
                    long now = System.currentTimeMillis();
                    if (this.holder.value() == N.NULL_MASK || (double)(now - this.lastScheduledTime) > (double)intervalDurationInMillis * 3.0) {
                        this.holder.setValue(param);
                        this.prevTimestamp = now;
                        this.schedule(intervalDuration, unit);
                    } else {
                        this.holder.setValue(param);
                        this.prevTimestamp = now;
                    }
                }
            }

            private void schedule(long delay, TimeUnit unit2) {
                block2: {
                    try {
                        scheduler.schedule(new Runnable(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public void run() {
                                long pastIntervalInMills = System.currentTimeMillis() - prevTimestamp;
                                if (pastIntervalInMills >= intervalDurationInMillis) {
                                    Object lastParam = null;
                                    u.Holder holder = holder;
                                    synchronized (holder) {
                                        lastParam = holder.value();
                                        holder.setValue(N.NULL_MASK);
                                    }
                                    if (lastParam != N.NULL_MASK && downDispatcher != null) {
                                        downDispatcher.onNext(lastParam);
                                    }
                                } else {
                                    this.schedule(intervalDurationInMillis - pastIntervalInMills, TimeUnit.MILLISECONDS);
                                }
                            }
                        }, delay, unit2);
                        this.lastScheduledTime = System.currentTimeMillis();
                    }
                    catch (Exception e) {
                        this.holder.setValue(N.NULL_MASK);
                        if (this.downDispatcher == null) break block2;
                        this.downDispatcher.onError(e);
                    }
                }
            }
        });
        return this;
    }

    public Observer<T> throttleFirst(long intervalDurationInMillis) {
        return this.throttleFirst(intervalDurationInMillis, TimeUnit.MILLISECONDS);
    }

    public Observer<T> throttleFirst(final long intervalDuration, final TimeUnit unit) {
        N.checkArgument(intervalDuration >= 0L, "Interval can't be negative");
        N.checkArgNotNull(unit, "Time unit can't be null");
        if (intervalDuration == 0L) {
            return this;
        }
        final long intervalDurationInMillis = unit.toMillis(intervalDuration);
        this.dispatcher.append(new Dispatcher<Object>(){
            private long lastScheduledTime = 0L;

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onNext(Object param) {
                u.Holder holder = this.holder;
                synchronized (holder) {
                    block6: {
                        long now = System.currentTimeMillis();
                        if (this.holder.value() == N.NULL_MASK || (double)(now - this.lastScheduledTime) > (double)intervalDurationInMillis * 3.0) {
                            this.holder.setValue(param);
                            try {
                                scheduler.schedule(new Runnable(){

                                    /*
                                     * WARNING - Removed try catching itself - possible behaviour change.
                                     */
                                    @Override
                                    public void run() {
                                        Object firstParam = null;
                                        u.Holder holder = holder;
                                        synchronized (holder) {
                                            firstParam = holder.value();
                                            holder.setValue(N.NULL_MASK);
                                        }
                                        if (firstParam != N.NULL_MASK && downDispatcher != null) {
                                            downDispatcher.onNext(firstParam);
                                        }
                                    }
                                }, intervalDuration, unit);
                                this.lastScheduledTime = now;
                            }
                            catch (Exception e) {
                                this.holder.setValue(N.NULL_MASK);
                                if (this.downDispatcher == null) break block6;
                                this.downDispatcher.onError(e);
                            }
                        }
                    }
                }
            }
        });
        return this;
    }

    public Observer<T> throttleLast(long intervalDurationInMillis) {
        return this.throttleLast(intervalDurationInMillis, TimeUnit.MILLISECONDS);
    }

    public Observer<T> throttleLast(final long intervalDuration, final TimeUnit unit) {
        N.checkArgument(intervalDuration >= 0L, "Delay can't be negative");
        N.checkArgNotNull(unit, "Time unit can't be null");
        if (intervalDuration == 0L) {
            return this;
        }
        final long intervalDurationInMillis = unit.toMillis(intervalDuration);
        this.dispatcher.append(new Dispatcher<Object>(){
            private long lastScheduledTime = 0L;

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onNext(Object param) {
                u.Holder holder = this.holder;
                synchronized (holder) {
                    long now = System.currentTimeMillis();
                    if (this.holder.value() == N.NULL_MASK || (double)(now - this.lastScheduledTime) > (double)intervalDurationInMillis * 3.0) {
                        this.holder.setValue(param);
                        try {
                            scheduler.schedule(new Runnable(){

                                /*
                                 * WARNING - Removed try catching itself - possible behaviour change.
                                 */
                                @Override
                                public void run() {
                                    Object lastParam = null;
                                    u.Holder holder = holder;
                                    synchronized (holder) {
                                        lastParam = holder.value();
                                        holder.setValue(N.NULL_MASK);
                                    }
                                    if (lastParam != N.NULL_MASK && downDispatcher != null) {
                                        downDispatcher.onNext(lastParam);
                                    }
                                }
                            }, intervalDuration, unit);
                            this.lastScheduledTime = now;
                        }
                        catch (Exception e) {
                            this.holder.setValue(N.NULL_MASK);
                            if (this.downDispatcher != null) {
                                this.downDispatcher.onError(e);
                            }
                        }
                    } else {
                        this.holder.setValue(param);
                    }
                }
            }
        });
        return this;
    }

    public Observer<T> delay(long delayInMillis) {
        return this.delay(delayInMillis, TimeUnit.MILLISECONDS);
    }

    public Observer<T> delay(final long delay, final TimeUnit unit) {
        N.checkArgument(delay >= 0L, "Delay can't be negative");
        N.checkArgNotNull(unit, "Time unit can't be null");
        if (delay == 0L) {
            return this;
        }
        this.dispatcher.append(new Dispatcher<Object>(){
            private final long startTime = System.currentTimeMillis();
            private boolean isDelayed = false;

            @Override
            public void onNext(Object param) {
                if (!this.isDelayed) {
                    N.sleep(unit.toMillis(delay) - (System.currentTimeMillis() - this.startTime));
                    this.isDelayed = true;
                }
                if (this.downDispatcher != null) {
                    this.downDispatcher.onNext(param);
                }
            }
        });
        return this;
    }

    public Observer<Timed<T>> timeInterval() {
        this.dispatcher.append(new Dispatcher<Object>(){
            private long startTime = System.currentTimeMillis();

            @Override
            public synchronized void onNext(Object param) {
                if (this.downDispatcher != null) {
                    long now = System.currentTimeMillis();
                    long interval = now - this.startTime;
                    this.startTime = now;
                    this.downDispatcher.onNext(Timed.of(param, interval));
                }
            }
        });
        return this;
    }

    public Observer<Timed<T>> timestamp() {
        this.dispatcher.append(new Dispatcher<Object>(){

            @Override
            public void onNext(Object param) {
                if (this.downDispatcher != null) {
                    this.downDispatcher.onNext(Timed.of(param, System.currentTimeMillis()));
                }
            }
        });
        return this;
    }

    public Observer<T> skip(final long n) {
        N.checkArgNotNegative(n, "n");
        if (n > 0L) {
            this.dispatcher.append(new Dispatcher<Object>(){
                private final AtomicLong counter = new AtomicLong();

                @Override
                public void onNext(Object param) {
                    if (this.downDispatcher != null && this.counter.incrementAndGet() > n) {
                        this.downDispatcher.onNext(param);
                    }
                }
            });
        }
        return this;
    }

    public Observer<T> limit(final long maxSize) {
        N.checkArgNotNegative(maxSize, "maxSize");
        this.dispatcher.append(new Dispatcher<Object>(){
            private final AtomicLong counter = new AtomicLong();

            @Override
            public void onNext(Object param) {
                if (this.downDispatcher != null && this.counter.incrementAndGet() <= maxSize) {
                    this.downDispatcher.onNext(param);
                } else {
                    Observer.this.hasMore = false;
                }
            }
        });
        return this;
    }

    public Observer<T> distinct() {
        this.dispatcher.append(new Dispatcher<Object>(){
            private Set<T> set = N.newHashSet();

            @Override
            public void onNext(Object param) {
                if (this.downDispatcher != null && this.set.add(param)) {
                    this.downDispatcher.onNext(param);
                }
            }
        });
        return this;
    }

    public Observer<T> distinctBy(final Function<? super T, ?> keyMapper) {
        this.dispatcher.append(new Dispatcher<Object>(){
            private Set<Object> set = N.newHashSet();

            @Override
            public void onNext(Object param) {
                if (this.downDispatcher != null && this.set.add(keyMapper.apply(param))) {
                    this.downDispatcher.onNext(param);
                }
            }
        });
        return this;
    }

    public Observer<T> filter(final Predicate<? super T> filter) {
        this.dispatcher.append(new Dispatcher<Object>(){

            @Override
            public void onNext(Object param) {
                if (this.downDispatcher != null && filter.test(param)) {
                    this.downDispatcher.onNext(param);
                }
            }
        });
        return this;
    }

    public <U> Observer<U> map(final Function<? super T, U> map) {
        this.dispatcher.append(new Dispatcher<Object>(){

            @Override
            public void onNext(Object param) {
                if (this.downDispatcher != null) {
                    this.downDispatcher.onNext(map.apply(param));
                }
            }
        });
        return this;
    }

    public <U> Observer<U> flatMap(final Function<? super T, Collection<U>> map) {
        this.dispatcher.append(new Dispatcher<Object>(){

            @Override
            public void onNext(Object param) {
                Collection c;
                if (this.downDispatcher != null && N.notNullOrEmpty(c = (Collection)map.apply(param))) {
                    for (Object u2 : c) {
                        this.downDispatcher.onNext(u2);
                    }
                }
            }
        });
        return this;
    }

    public Observer<List<T>> buffer(long timespan, TimeUnit unit) {
        return this.buffer(timespan, unit, Integer.MAX_VALUE);
    }

    public Observer<List<T>> buffer(final long timespan, final TimeUnit unit, final int count) {
        N.checkArgument(timespan > 0L, "timespan can't be 0 or negative");
        N.checkArgNotNull(unit, "Time unit can't be null");
        N.checkArgument(count > 0, "count can't be 0 or negative");
        this.dispatcher.append(new Dispatcher<Object>(){
            private final List<T> queue = new ArrayList();
            {
                Observer.this.scheduledFutures.put(scheduler.scheduleAtFixedRate(new Runnable(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        ArrayList list = null;
                        List list2 = queue;
                        synchronized (list2) {
                            list = new ArrayList(queue);
                            queue.clear();
                        }
                        if (downDispatcher != null) {
                            downDispatcher.onNext(list);
                        }
                    }
                }, timespan, timespan, unit), timespan);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onNext(Object param) {
                ArrayList list = null;
                List list2 = this.queue;
                synchronized (list2) {
                    this.queue.add(param);
                    if (this.queue.size() == count) {
                        list = new ArrayList(this.queue);
                        this.queue.clear();
                    }
                }
                if (list != null && this.downDispatcher != null) {
                    this.downDispatcher.onNext(list);
                }
            }
        });
        return this;
    }

    public Observer<List<T>> buffer(long timespan, long timeskip, TimeUnit unit) {
        return this.buffer(timespan, timeskip, unit, Integer.MAX_VALUE);
    }

    public Observer<List<T>> buffer(final long timespan, final long timeskip, final TimeUnit unit, final int count) {
        N.checkArgument(timespan > 0L, "timespan can't be 0 or negative");
        N.checkArgument(timeskip > 0L, "timeskip can't be 0 or negative");
        N.checkArgNotNull(unit, "Time unit can't be null");
        N.checkArgument(count > 0, "count can't be 0 or negative");
        this.dispatcher.append(new Dispatcher<Object>(){
            private final long startTime = System.currentTimeMillis();
            private final long interval = timespan + timeskip;
            private final List<T> queue = new ArrayList();
            {
                Observer.this.scheduledFutures.put(scheduler.scheduleAtFixedRate(new Runnable(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        ArrayList list = null;
                        List list2 = queue;
                        synchronized (list2) {
                            list = new ArrayList(queue);
                            queue.clear();
                        }
                        if (downDispatcher != null) {
                            downDispatcher.onNext(list);
                        }
                    }
                }, timespan, this.interval, unit), this.interval);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onNext(Object param) {
                if ((System.currentTimeMillis() - this.startTime) % this.interval <= timespan) {
                    ArrayList list = null;
                    List list2 = this.queue;
                    synchronized (list2) {
                        this.queue.add(param);
                        if (this.queue.size() == count) {
                            list = new ArrayList(this.queue);
                            this.queue.clear();
                        }
                    }
                    if (list != null && this.downDispatcher != null) {
                        this.downDispatcher.onNext(list);
                    }
                }
            }
        });
        return this;
    }

    public void observe(Consumer<? super T> action) {
        this.observe(action, ON_ERROR_MISSING);
    }

    public void observe(Consumer<? super T> action, Consumer<? super Exception> onError) {
        this.observe(action, onError, EMPTY_ACTION);
    }

    public abstract void observe(Consumer<? super T> var1, Consumer<? super Exception> var2, Runnable var3);

    void cancelScheduledFutures() {
        long startTime = System.currentTimeMillis();
        if (N.notNullOrEmpty(this.scheduledFutures)) {
            for (Map.Entry<ScheduledFuture<?>, Long> entry : this.scheduledFutures.entrySet()) {
                long delay = entry.getValue();
                N.sleep(delay - (System.currentTimeMillis() - startTime) + delay);
                entry.getKey().cancel(false);
            }
        }
    }

    static {
        scheduler.setRemoveOnCancelPolicy(true);
    }

    static final class IntervalObserver<T>
    extends ObserverBase<T> {
        private final long initialDelay;
        private final long period;
        private final TimeUnit unit;
        private ScheduledFuture<?> future = null;

        IntervalObserver(long initialDelay, long period, TimeUnit unit) {
            this.initialDelay = initialDelay;
            this.period = period;
            this.unit = unit;
        }

        @Override
        public void observe(final Consumer<? super T> action, Consumer<? super Exception> onError, Runnable onComplete) {
            N.checkArgNotNull(action, "action");
            this.dispatcher.append(new DispatcherBase<Object>(onError, onComplete){

                @Override
                public void onNext(Object param) {
                    action.accept(param);
                }
            });
            this.future = scheduler.scheduleAtFixedRate(new Runnable(){
                private long val = 0L;

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    if (!IntervalObserver.this.hasMore) {
                        try {
                            IntervalObserver.this.dispatcher.onComplete();
                        }
                        finally {
                            try {
                                IntervalObserver.this.future.cancel(true);
                            }
                            finally {
                                IntervalObserver.this.cancelScheduledFutures();
                            }
                        }
                    }
                    try {
                        IntervalObserver.this.dispatcher.onNext(this.val++);
                    }
                    catch (Exception e) {
                        try {
                            IntervalObserver.this.future.cancel(true);
                        }
                        finally {
                            IntervalObserver.this.cancelScheduledFutures();
                        }
                    }
                }
            }, this.initialDelay, this.period, this.unit);
        }
    }

    static final class TimerObserver<T>
    extends ObserverBase<T> {
        private final long delay;
        private final TimeUnit unit;

        TimerObserver(long delay, TimeUnit unit) {
            this.delay = delay;
            this.unit = unit;
        }

        @Override
        public void observe(final Consumer<? super T> action, Consumer<? super Exception> onError, final Runnable onComplete) {
            N.checkArgNotNull(action, "action");
            this.dispatcher.append(new DispatcherBase<Object>(onError, onComplete){

                @Override
                public void onNext(Object param) {
                    action.accept(param);
                }
            });
            scheduler.schedule(new Runnable(){

                @Override
                public void run() {
                    try {
                        TimerObserver.this.dispatcher.onNext(0L);
                        onComplete.run();
                    }
                    finally {
                        TimerObserver.this.cancelScheduledFutures();
                    }
                }
            }, this.delay, this.unit);
        }
    }

    static final class IteratorObserver<T>
    extends ObserverBase<T> {
        private final Iterator<T> iter;

        IteratorObserver(Iterator<T> iter) {
            this.iter = iter;
        }

        @Override
        public void observe(final Consumer<? super T> action, final Consumer<? super Exception> onError, final Runnable onComplete) {
            N.checkArgNotNull(action, "action");
            this.dispatcher.append(new DispatcherBase<Object>(onError, onComplete){

                @Override
                public void onNext(Object param) {
                    action.accept(param);
                }
            });
            asyncExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    block7: {
                        boolean isOnError = true;
                        try {
                            while (IteratorObserver.this.hasMore && IteratorObserver.this.iter.hasNext()) {
                                isOnError = false;
                                IteratorObserver.this.dispatcher.onNext(IteratorObserver.this.iter.next());
                                isOnError = true;
                            }
                            isOnError = false;
                            onComplete.run();
                        }
                        catch (Exception e) {
                            if (isOnError) {
                                onError.accept(e);
                                break block7;
                            }
                            throw N.toRuntimeException(e);
                        }
                        finally {
                            IteratorObserver.this.cancelScheduledFutures();
                        }
                    }
                }
            });
        }
    }

    static final class BlockingQueueObserver<T>
    extends ObserverBase<T> {
        private final BlockingQueue<T> queue;

        BlockingQueueObserver(BlockingQueue<T> queue) {
            this.queue = queue;
        }

        @Override
        public void observe(final Consumer<? super T> action, final Consumer<? super Exception> onError, final Runnable onComplete) {
            N.checkArgNotNull(action, "action");
            this.dispatcher.append(new DispatcherBase<Object>(onError, onComplete){

                @Override
                public void onNext(Object param) {
                    action.accept(param);
                }
            });
            asyncExecutor.execute(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    block7: {
                        Object next = null;
                        boolean isOnError = true;
                        try {
                            while (BlockingQueueObserver.this.hasMore) {
                                Object e = BlockingQueueObserver.this.queue.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                                next = e;
                                if (e == COMPLETE_FLAG) break;
                                isOnError = false;
                                BlockingQueueObserver.this.dispatcher.onNext(next);
                                isOnError = true;
                            }
                            isOnError = false;
                            onComplete.run();
                        }
                        catch (Exception e) {
                            if (isOnError) {
                                onError.accept(e);
                                break block7;
                            }
                            throw N.toRuntimeException(e);
                        }
                        finally {
                            BlockingQueueObserver.this.cancelScheduledFutures();
                        }
                    }
                }
            });
        }
    }

    protected static abstract class ObserverBase<T>
    extends Observer<T> {
        protected ObserverBase() {
        }
    }

    protected static abstract class DispatcherBase<T>
    extends Dispatcher<T> {
        private final Consumer<? super Exception> onError;
        private final Runnable onComplete;

        protected DispatcherBase(Consumer<? super Exception> onError, Runnable onComplete) {
            this.onError = onError;
            this.onComplete = onComplete;
        }

        @Override
        public void onError(Exception error) {
            this.onError.accept(error);
        }

        @Override
        public void onComplete() {
            this.onComplete.run();
        }
    }

    protected static class Dispatcher<T> {
        protected final u.Holder<Object> holder = u.Holder.of(N.NULL_MASK);
        protected Dispatcher<T> downDispatcher;

        protected Dispatcher() {
        }

        public void onNext(T value) {
            if (this.downDispatcher != null) {
                this.downDispatcher.onNext(value);
            }
        }

        public void onError(Exception error) {
            if (this.downDispatcher != null) {
                this.downDispatcher.onError(error);
            }
        }

        public void onComplete() {
            if (this.downDispatcher != null) {
                this.downDispatcher.onComplete();
            }
        }

        public void append(Dispatcher<T> downDispatcher) {
            Dispatcher<T> tmp = this;
            while (tmp.downDispatcher != null) {
                tmp = tmp.downDispatcher;
            }
            tmp.downDispatcher = downDispatcher;
        }
    }

    protected static class Node<T> {
        public final T value;
        public Node<T> next;

        public Node(T value) {
            this(value, null);
        }

        public Node(T value, Node<T> next) {
            this.value = value;
            this.next = next;
        }
    }
}

