/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import com.google.common.base.Stopwatch;
import io.netty.util.internal.PlatformDependent;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import ratpack.exec.Downstream;
import ratpack.exec.ExecResult;
import ratpack.exec.Promise;
import ratpack.exec.Result;
import ratpack.exec.Upstream;
import ratpack.exec.internal.CompleteExecResult;
import ratpack.func.Function;

public class CachingUpstream<T>
implements Upstream<T> {
    private Upstream<? extends T> upstream;
    private final AtomicReference<Loading> loadingRef = new AtomicReference<Loading>(new Loading());
    private final Function<? super ExecResult<T>, Duration> ttlFunc;
    private final AtomicBoolean draining = new AtomicBoolean();
    private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();

    public CachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
        this.upstream = upstream;
        this.ttlFunc = ttl;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryDrain() {
        if (this.draining.compareAndSet(false, true)) {
            try {
                if (!this.waiting.isEmpty()) {
                    Loading loading = this.loadingRef.get();
                    LoadingState state = loading.getState();
                    if (state == LoadingState.INIT) {
                        this.startLoad(loading);
                    } else if (state == LoadingState.EXPIRED) {
                        Loading newLoading = new Loading();
                        this.loadingRef.compareAndSet(loading, newLoading);
                        this.startLoad(this.loadingRef.get());
                    } else if (state == LoadingState.LOADED) {
                        Downstream downstream = this.waiting.poll();
                        while (downstream != null) {
                            downstream.accept(loading.cached.result);
                            downstream = this.waiting.poll();
                        }
                    }
                }
            }
            finally {
                this.draining.set(false);
            }
            if (!this.waiting.isEmpty() && this.loadingRef.get().getState() != LoadingState.PENDING) {
                this.tryDrain();
            }
        }
    }

    private void startLoad(Loading loading) {
        if (loading.pending.compareAndSet(false, true)) {
            Downstream<? super T> downstream = this.waiting.poll();
            try {
                this.yield(loading, downstream);
            }
            catch (Throwable e) {
                this.receiveResult(loading, downstream, ExecResult.of(Result.error(e)));
            }
        }
    }

    private void yield(final Loading loading, final Downstream<? super T> downstream) throws Exception {
        this.upstream.connect(new Downstream<T>(){

            @Override
            public void error(Throwable throwable) {
                CachingUpstream.this.receiveResult(loading, downstream, ExecResult.of(Result.error(throwable)));
            }

            @Override
            public void success(T value) {
                CachingUpstream.this.receiveResult(loading, downstream, ExecResult.of(Result.success(value)));
            }

            @Override
            public void complete() {
                CachingUpstream.this.receiveResult(loading, downstream, CompleteExecResult.get());
            }
        });
    }

    @Override
    public void connect(Downstream<? super T> downstream) {
        Loading loading = this.loadingRef.get();
        if (loading.getState() == LoadingState.LOADED) {
            downstream.accept(loading.cached.result);
        } else {
            Promise.async(d -> {
                this.waiting.add(d);
                this.tryDrain();
            }).result(downstream::accept);
        }
    }

    private void receiveResult(Loading loading, Downstream<? super T> downstream, ExecResult<T> result) {
        Duration ttl = Duration.ofSeconds(0L);
        try {
            ttl = this.ttlFunc.apply(result);
        }
        catch (Throwable e) {
            if (result.isError()) {
                result.getThrowable().addSuppressed(e);
            }
            result = ExecResult.of(Result.error(e));
        }
        if (ttl.isNegative()) {
            this.upstream = null;
        }
        loading.cached = new Cached<T>(result, ttl);
        downstream.accept(result);
        this.tryDrain();
    }

    private class Loading {
        volatile Cached<T> cached;
        final AtomicBoolean pending = new AtomicBoolean();

        private Loading() {
        }

        private LoadingState getState() {
            if (this.cached == null) {
                if (this.pending.get()) {
                    return LoadingState.PENDING;
                }
                return LoadingState.INIT;
            }
            if (this.cached.isExpired()) {
                return LoadingState.EXPIRED;
            }
            return LoadingState.LOADED;
        }
    }

    static enum LoadingState {
        INIT,
        PENDING,
        EXPIRED,
        LOADED;

    }

    private static class Cached<T> {
        final ExecResult<T> result;
        private final Duration ttl;
        private final Stopwatch stopwatch;

        Cached(ExecResult<T> result, Duration ttl) {
            this.result = result;
            this.ttl = ttl;
            this.stopwatch = !ttl.isNegative() ? Stopwatch.createStarted() : null;
        }

        boolean isExpired() {
            return this.stopwatch != null && this.stopwatch.elapsed().compareTo(this.ttl) >= 0;
        }
    }
}

