/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.internal.PlatformDependent;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import ratpack.exec.Downstream;
import ratpack.exec.ExecResult;
import ratpack.exec.Promise;
import ratpack.exec.Result;
import ratpack.exec.Upstream;
import ratpack.exec.internal.CompleteExecResult;
import ratpack.func.Function;

public class CachingUpstream<T>
implements Upstream<T> {
    private Upstream<? extends T> upstream;
    private final Clock clock;
    private final AtomicReference<Cached<? extends T>> ref = new AtomicReference();
    private final Function<? super ExecResult<T>, Duration> ttlFunc;
    private final AtomicBoolean pending = new AtomicBoolean();
    private final AtomicBoolean draining = new AtomicBoolean();
    private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();

    public CachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
        this(upstream, ttl, Clock.systemUTC());
    }

    @VisibleForTesting
    CachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
        this.upstream = upstream;
        this.ttlFunc = ttl;
        this.clock = clock;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryDrain() {
        if (this.draining.compareAndSet(false, true)) {
            try {
                Cached<? extends T> cached = this.ref.get();
                if (this.needsFetch(cached)) {
                    if (this.pending.compareAndSet(false, true)) {
                        Downstream<? super T> downstream = this.waiting.poll();
                        if (downstream == null) {
                            this.pending.set(false);
                        } else {
                            try {
                                this.yield(downstream);
                            }
                            catch (Throwable e) {
                                this.receiveResult(downstream, ExecResult.of(Result.error(e)));
                            }
                        }
                    }
                } else {
                    Downstream downstream = this.waiting.poll();
                    while (downstream != null) {
                        downstream.accept(cached.result);
                        downstream = this.waiting.poll();
                    }
                }
            }
            finally {
                this.draining.set(false);
            }
        }
        if (!this.waiting.isEmpty() && !this.pending.get() && this.needsFetch(this.ref.get())) {
            this.tryDrain();
        }
    }

    private boolean needsFetch(Cached<? extends T> cached) {
        return cached == null || cached.expireAt != null && cached.expireAt.isBefore(this.clock.instant());
    }

    private void yield(final Downstream<? super T> downstream) throws Exception {
        this.upstream.connect(new Downstream<T>(){

            @Override
            public void error(Throwable throwable) {
                CachingUpstream.this.receiveResult(downstream, ExecResult.of(Result.error(throwable)));
            }

            @Override
            public void success(T value) {
                CachingUpstream.this.receiveResult(downstream, ExecResult.of(Result.success(value)));
            }

            @Override
            public void complete() {
                CachingUpstream.this.receiveResult(downstream, CompleteExecResult.get());
            }
        });
    }

    @Override
    public void connect(Downstream<? super T> downstream) throws Exception {
        Cached<? extends T> cached = this.ref.get();
        if (this.needsFetch(cached)) {
            Promise.async(d -> {
                this.waiting.add(d);
                this.tryDrain();
            }).result(downstream::accept);
        } else {
            downstream.accept(cached.result);
        }
    }

    private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
        Instant expiresAt;
        Duration ttl = Duration.ofSeconds(0L);
        try {
            ttl = this.ttlFunc.apply(result);
        }
        catch (Throwable e) {
            if (result.isError()) {
                result.getThrowable().addSuppressed(e);
            }
            result = ExecResult.of(Result.error(e));
        }
        if (ttl.isNegative()) {
            expiresAt = null;
            this.upstream = null;
        } else {
            expiresAt = ttl.isZero() ? this.clock.instant().minus(Duration.ofSeconds(1L)) : this.clock.instant().plus(ttl);
        }
        this.ref.set(new Cached<T>(result, expiresAt));
        this.pending.set(false);
        downstream.accept(result);
        this.tryDrain();
    }

    private static class Cached<T> {
        final ExecResult<T> result;
        final Instant expireAt;

        Cached(ExecResult<T> result, Instant expireAt) {
            this.result = result;
            this.expireAt = expireAt;
        }
    }
}

