/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import io.netty.util.internal.PlatformDependent;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import ratpack.exec.Downstream;
import ratpack.exec.ExecResult;
import ratpack.exec.Promise;
import ratpack.exec.Result;
import ratpack.exec.Upstream;
import ratpack.exec.internal.CompleteExecResult;
import ratpack.func.Predicate;

public class CachingUpstream<T>
implements Upstream<T> {
    private Upstream<? extends T> upstream;
    private final AtomicReference<ExecResult<? extends T>> resultRef = new AtomicReference();
    private final Predicate<? super ExecResult<T>> predicate;
    private final AtomicBoolean pending = new AtomicBoolean();
    private final AtomicBoolean draining = new AtomicBoolean();
    private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();

    public CachingUpstream(Upstream<? extends T> upstream, Predicate<? super ExecResult<T>> predicate) {
        this.upstream = upstream;
        this.predicate = predicate;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryDrain() {
        if (this.draining.compareAndSet(false, true)) {
            try {
                ExecResult<? extends T> result = this.resultRef.get();
                if (result == null) {
                    if (this.pending.compareAndSet(false, true)) {
                        Downstream<? super T> downstream = this.waiting.poll();
                        if (downstream == null) {
                            this.pending.set(false);
                        } else {
                            try {
                                this.yield(downstream);
                            }
                            catch (Throwable e) {
                                this.receiveResult(downstream, ExecResult.of(Result.error(e)));
                            }
                        }
                    }
                } else {
                    Downstream<? extends T> downstream = this.waiting.poll();
                    while (downstream != null) {
                        downstream.accept(result);
                        downstream = this.waiting.poll();
                    }
                }
            }
            finally {
                this.draining.set(false);
            }
        }
        if (!(this.waiting.isEmpty() || this.resultRef.get() == null && this.pending.get())) {
            this.tryDrain();
        }
    }

    private void yield(final Downstream<? super T> downstream) throws Exception {
        this.upstream.connect(new Downstream<T>(){

            @Override
            public void error(Throwable throwable) {
                CachingUpstream.this.receiveResult(downstream, ExecResult.of(Result.error(throwable)));
            }

            @Override
            public void success(T value) {
                CachingUpstream.this.receiveResult(downstream, ExecResult.of(Result.success(value)));
            }

            @Override
            public void complete() {
                CachingUpstream.this.receiveResult(downstream, CompleteExecResult.get());
            }
        });
    }

    @Override
    public void connect(Downstream<? super T> downstream) throws Exception {
        ExecResult<? extends T> result = this.resultRef.get();
        if (result == null) {
            Promise.async(d -> {
                this.waiting.add(d);
                this.tryDrain();
            }).result(downstream::accept);
        } else {
            downstream.accept(result);
        }
    }

    private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
        boolean shouldCache = false;
        try {
            shouldCache = this.predicate.apply(result);
        }
        catch (Throwable e) {
            if (result.isError()) {
                result.getThrowable().addSuppressed(e);
            }
            result = ExecResult.of(Result.error(e));
        }
        if (shouldCache) {
            this.resultRef.set(result);
            this.upstream = null;
        } else {
            this.pending.set(false);
        }
        downstream.accept(result);
        this.tryDrain();
    }
}

