/*
 * Decompiled with CFR 0.152.
 */
package reactor.core.publisher;

import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.InnerConsumer;
import reactor.core.publisher.InnerProducer;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

final class FluxReplay<T>
extends ConnectableFlux<T>
implements Scannable,
Fuseable {
    final Publisher<T> source;
    final int history;
    final long ttl;
    final Scheduler scheduler;
    volatile ReplaySubscriber<T> connection;
    static final AtomicReferenceFieldUpdater<FluxReplay, ReplaySubscriber> CONNECTION = AtomicReferenceFieldUpdater.newUpdater(FluxReplay.class, ReplaySubscriber.class, "connection");

    FluxReplay(Publisher<T> source, int history, long ttl, @Nullable Scheduler scheduler) {
        this.source = Objects.requireNonNull(source, "source");
        this.history = history;
        if (history < 0) {
            throw new IllegalArgumentException("History cannot be negative : " + history);
        }
        if (scheduler != null && ttl < 0L) {
            throw new IllegalArgumentException("TTL cannot be negative : " + ttl);
        }
        this.ttl = ttl;
        this.scheduler = scheduler;
    }

    @Override
    public int getPrefetch() {
        return this.history;
    }

    ReplaySubscriber<T> newState() {
        if (this.scheduler != null) {
            return new ReplaySubscriber(new SizeAndTimeBoundReplayBuffer(this.history, this.ttl, this.scheduler), this);
        }
        if (this.history != Integer.MAX_VALUE) {
            return new ReplaySubscriber(new SizeBoundReplayBuffer(this.history), this);
        }
        return new ReplaySubscriber(new UnboundedReplayBuffer(Queues.SMALL_BUFFER_SIZE), this);
    }

    @Override
    public void connect(Consumer<? super Disposable> cancelSupport) {
        ReplaySubscriber<T> s;
        while ((s = this.connection) == null) {
            ReplaySubscriber<T> u = this.newState();
            if (!CONNECTION.compareAndSet(this, null, u)) continue;
            s = u;
            break;
        }
        boolean doConnect = s.tryConnect();
        cancelSupport.accept(s);
        if (doConnect) {
            this.source.subscribe(s);
        }
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        ReplaySubscriber<Object> c;
        ReplayInner<T> inner;
        block3: {
            ReplaySubscriber<T> u;
            inner = new ReplayInner<T>(actual);
            while (true) {
                c = this.connection;
                if (this.scheduler != null && c != null && c.buffer.isExpired()) {
                    u = this.newState();
                    if (!CONNECTION.compareAndSet(this, c, u)) continue;
                    c = u;
                    this.source.subscribe(u);
                    break block3;
                }
                if (c != null) break block3;
                u = this.newState();
                if (CONNECTION.compareAndSet(this, null, u)) break;
            }
            c = u;
        }
        c.add(inner);
        actual.onSubscribe(inner);
        if (inner.isCancelled()) {
            c.remove(inner);
        }
        inner.parent = c;
        c.buffer.replay(inner);
    }

    @Override
    @Nullable
    public Object scanUnsafe(Scannable.Attr key) {
        if (key == Scannable.Attr.PREFETCH) {
            return this.getPrefetch();
        }
        if (key == Scannable.Attr.PARENT) {
            return this.source;
        }
        return null;
    }

    static final class ReplayInner<T>
    implements ReplaySubscription<T> {
        final CoreSubscriber<? super T> actual;
        ReplaySubscriber<T> parent;
        int index;
        int tailIndex;
        Object node;
        int fusionMode;
        volatile int wip;
        static final AtomicIntegerFieldUpdater<ReplayInner> WIP = AtomicIntegerFieldUpdater.newUpdater(ReplayInner.class, "wip");
        volatile long requested;
        static final AtomicLongFieldUpdater<ReplayInner> REQUESTED = AtomicLongFieldUpdater.newUpdater(ReplayInner.class, "requested");

        ReplayInner(CoreSubscriber<? super T> actual) {
            this.actual = actual;
        }

        public void request(long n) {
            if (Operators.validate(n)) {
                ReplaySubscriber<T> p;
                if (this.fusionMode() == 0) {
                    Operators.addCapCancellable(REQUESTED, this, n);
                }
                if ((p = this.parent) != null) {
                    p.buffer.replay(this);
                }
            }
        }

        @Override
        @Nullable
        public Object scanUnsafe(Scannable.Attr key) {
            if (key == Scannable.Attr.PARENT) {
                return this.parent;
            }
            if (key == Scannable.Attr.TERMINATED) {
                return this.parent != null && this.parent.isTerminated();
            }
            if (key == Scannable.Attr.BUFFERED) {
                return this.size();
            }
            if (key == Scannable.Attr.CANCELLED) {
                return this.isCancelled();
            }
            if (key == Scannable.Attr.REQUESTED_FROM_DOWNSTREAM) {
                return Math.max(0L, this.requested);
            }
            return ReplaySubscription.super.scanUnsafe(key);
        }

        public void cancel() {
            if (REQUESTED.getAndSet(this, Long.MIN_VALUE) != Long.MIN_VALUE) {
                ReplaySubscriber<T> p = this.parent;
                if (p != null) {
                    p.remove(this);
                }
                if (this.enter()) {
                    this.node = null;
                }
            }
        }

        @Override
        public long requested() {
            return this.requested;
        }

        @Override
        public boolean isCancelled() {
            return this.requested == Long.MIN_VALUE;
        }

        @Override
        public CoreSubscriber<? super T> actual() {
            return this.actual;
        }

        @Override
        public int requestFusion(int requestedMode) {
            if ((requestedMode & 2) != 0) {
                this.fusionMode = 2;
                return 2;
            }
            return 0;
        }

        @Override
        @Nullable
        public T poll() {
            ReplaySubscriber<T> p = this.parent;
            if (p != null) {
                return p.buffer.poll(this);
            }
            return null;
        }

        @Override
        public void clear() {
            ReplaySubscriber<T> p = this.parent;
            if (p != null) {
                p.buffer.clear(this);
            }
        }

        @Override
        public boolean isEmpty() {
            ReplaySubscriber<T> p = this.parent;
            return p == null || p.buffer.isEmpty(this);
        }

        @Override
        public int size() {
            ReplaySubscriber<T> p = this.parent;
            if (p != null) {
                return p.buffer.size(this);
            }
            return 0;
        }

        @Override
        public void node(@Nullable Object node) {
            this.node = node;
        }

        @Override
        public int fusionMode() {
            return this.fusionMode;
        }

        @Override
        @Nullable
        public Object node() {
            return this.node;
        }

        @Override
        public int index() {
            return this.index;
        }

        @Override
        public void index(int index) {
            this.index = index;
        }

        @Override
        public int tailIndex() {
            return this.tailIndex;
        }

        @Override
        public void tailIndex(int tailIndex) {
            this.tailIndex = tailIndex;
        }

        @Override
        public boolean enter() {
            return WIP.getAndIncrement(this) == 0;
        }

        @Override
        public int leave(int missed) {
            return WIP.addAndGet(this, -missed);
        }

        @Override
        public void produced(long n) {
            REQUESTED.addAndGet(this, -n);
        }
    }

    static final class ReplaySubscriber<T>
    implements InnerConsumer<T>,
    Disposable {
        final FluxReplay<T> parent;
        final ReplayBuffer<T> buffer;
        volatile Subscription s;
        static final AtomicReferenceFieldUpdater<ReplaySubscriber, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(ReplaySubscriber.class, Subscription.class, "s");
        volatile ReplaySubscription<T>[] subscribers;
        volatile int wip;
        static final AtomicIntegerFieldUpdater<ReplaySubscriber> WIP = AtomicIntegerFieldUpdater.newUpdater(ReplaySubscriber.class, "wip");
        volatile int connected;
        static final AtomicIntegerFieldUpdater<ReplaySubscriber> CONNECTED = AtomicIntegerFieldUpdater.newUpdater(ReplaySubscriber.class, "connected");
        static final ReplaySubscription[] EMPTY = new ReplaySubscription[0];
        static final ReplaySubscription[] TERMINATED = new ReplaySubscription[0];
        volatile boolean cancelled;

        ReplaySubscriber(ReplayBuffer<T> buffer, FluxReplay<T> parent) {
            this.buffer = buffer;
            this.parent = parent;
            this.subscribers = EMPTY;
        }

        @Override
        public void onSubscribe(Subscription s) {
            if (this.buffer.isDone()) {
                s.cancel();
            } else if (Operators.setOnce(S, this, s)) {
                s.request(Long.MAX_VALUE);
            }
        }

        public void onNext(T t) {
            ReplayBuffer<T> b = this.buffer;
            if (b.isDone()) {
                Operators.onNextDropped(t, this.currentContext());
            } else {
                b.add(t);
                for (ReplaySubscription<T> rs : this.subscribers) {
                    b.replay(rs);
                }
            }
        }

        public void onError(Throwable t) {
            ReplayBuffer<T> b = this.buffer;
            if (b.isDone()) {
                Operators.onErrorDropped(t, this.currentContext());
            } else {
                b.onError(t);
                for (ReplaySubscription<T> rs : this.terminate()) {
                    b.replay(rs);
                }
            }
        }

        public void onComplete() {
            ReplayBuffer<T> b = this.buffer;
            if (!b.isDone()) {
                b.onComplete();
                for (ReplaySubscription<T> rs : this.terminate()) {
                    b.replay(rs);
                }
            }
        }

        @Override
        public void dispose() {
            if (this.cancelled) {
                return;
            }
            if (Operators.terminate(S, this)) {
                this.cancelled = true;
                CONNECTION.lazySet(this.parent, null);
                CancellationException ex = new CancellationException("Disconnected");
                this.buffer.onError(ex);
                for (ReplaySubscription<T> inner : this.terminate()) {
                    this.buffer.replay(inner);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        boolean add(ReplayInner<T> inner) {
            if (this.subscribers == TERMINATED) {
                return false;
            }
            ReplaySubscriber replaySubscriber = this;
            synchronized (replaySubscriber) {
                ReplaySubscription<T>[] a = this.subscribers;
                if (a == TERMINATED) {
                    return false;
                }
                int n = a.length;
                ReplayInner[] b = new ReplayInner[n + 1];
                System.arraycopy(a, 0, b, 0, n);
                b[n] = inner;
                this.subscribers = b;
                return true;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void remove(ReplaySubscription<T> inner) {
            ReplaySubscription<T>[] a = this.subscribers;
            if (a == TERMINATED || a == EMPTY) {
                return;
            }
            ReplaySubscriber replaySubscriber = this;
            synchronized (replaySubscriber) {
                ReplaySubscription[] b;
                a = this.subscribers;
                if (a == TERMINATED || a == EMPTY) {
                    return;
                }
                int j = -1;
                int n = a.length;
                for (int i = 0; i < n; ++i) {
                    if (a[i] != inner) continue;
                    j = i;
                    break;
                }
                if (j < 0) {
                    return;
                }
                if (n == 1) {
                    b = EMPTY;
                } else {
                    b = new ReplayInner[n - 1];
                    System.arraycopy(a, 0, b, 0, j);
                    System.arraycopy(a, j + 1, b, j, n - j - 1);
                }
                this.subscribers = b;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        ReplaySubscription<T>[] terminate() {
            ReplaySubscription<T>[] a = this.subscribers;
            if (a == TERMINATED) {
                return a;
            }
            ReplaySubscriber replaySubscriber = this;
            synchronized (replaySubscriber) {
                a = this.subscribers;
                if (a != TERMINATED) {
                    this.subscribers = TERMINATED;
                }
                return a;
            }
        }

        boolean isTerminated() {
            return this.subscribers == TERMINATED;
        }

        boolean tryConnect() {
            return this.connected == 0 && CONNECTED.compareAndSet(this, 0, 1);
        }

        @Override
        @Nullable
        public Object scanUnsafe(Scannable.Attr key) {
            if (key == Scannable.Attr.PARENT) {
                return this.s;
            }
            if (key == Scannable.Attr.PREFETCH) {
                return Integer.MAX_VALUE;
            }
            if (key == Scannable.Attr.CAPACITY) {
                return this.buffer.capacity();
            }
            if (key == Scannable.Attr.ERROR) {
                return this.buffer.getError();
            }
            if (key == Scannable.Attr.BUFFERED) {
                return this.buffer.size();
            }
            if (key == Scannable.Attr.TERMINATED) {
                return this.isTerminated();
            }
            if (key == Scannable.Attr.CANCELLED) {
                return this.cancelled;
            }
            return null;
        }

        @Override
        public Stream<? extends Scannable> inners() {
            return Stream.of(this.subscribers);
        }

        @Override
        public boolean isDisposed() {
            return this.cancelled;
        }
    }

    static final class SizeBoundReplayBuffer<T>
    implements ReplayBuffer<T> {
        final int limit;
        volatile Node<T> head;
        Node<T> tail;
        int size;
        volatile boolean done;
        Throwable error;

        SizeBoundReplayBuffer(int limit) {
            if (limit < 0) {
                throw new IllegalArgumentException("Limit cannot be negative");
            }
            this.limit = limit;
            Node<Object> n = new Node<Object>(null);
            this.tail = n;
            this.head = n;
        }

        @Override
        public boolean isExpired() {
            return false;
        }

        @Override
        public int capacity() {
            return this.limit;
        }

        @Override
        public void add(T value) {
            Node<T> n = new Node<T>(value);
            this.tail.set(n);
            this.tail = n;
            int s = this.size;
            if (s == this.limit) {
                this.head = (Node)this.head.get();
            } else {
                this.size = s + 1;
            }
        }

        @Override
        public void onError(Throwable ex) {
            this.error = ex;
            this.done = true;
        }

        @Override
        public void onComplete() {
            this.done = true;
        }

        void replayNormal(ReplaySubscription<T> rs) {
            CoreSubscriber<T> a = rs.actual();
            int missed = 1;
            do {
                boolean d;
                long r = rs.requested();
                long e = 0L;
                Node node = (Node)rs.node();
                if (node == null) {
                    node = this.head;
                }
                while (e != r) {
                    boolean empty;
                    if (rs.isCancelled()) {
                        rs.node(null);
                        return;
                    }
                    d = this.done;
                    Node next = (Node)node.get();
                    boolean bl = empty = next == null;
                    if (d && empty) {
                        rs.node(null);
                        Throwable ex = this.error;
                        if (ex != null) {
                            a.onError(ex);
                        } else {
                            a.onComplete();
                        }
                        return;
                    }
                    if (empty) break;
                    a.onNext(next.value);
                    ++e;
                    node = next;
                }
                if (e == r) {
                    boolean empty;
                    if (rs.isCancelled()) {
                        rs.node(null);
                        return;
                    }
                    d = this.done;
                    boolean bl = empty = node.get() == null;
                    if (d && empty) {
                        rs.node(null);
                        Throwable ex = this.error;
                        if (ex != null) {
                            a.onError(ex);
                        } else {
                            a.onComplete();
                        }
                        return;
                    }
                }
                if (e != 0L && r != Long.MAX_VALUE) {
                    rs.produced(e);
                }
                rs.node(node);
            } while ((missed = rs.leave(missed)) != 0);
        }

        void replayFused(ReplaySubscription<T> rs) {
            int missed = 1;
            CoreSubscriber<T> a = rs.actual();
            do {
                if (rs.isCancelled()) {
                    rs.node(null);
                    return;
                }
                boolean d = this.done;
                a.onNext(null);
                if (!d) continue;
                Throwable ex = this.error;
                if (ex != null) {
                    a.onError(ex);
                } else {
                    a.onComplete();
                }
                return;
            } while ((missed = rs.leave(missed)) != 0);
        }

        @Override
        public void replay(ReplaySubscription<T> rs) {
            if (!rs.enter()) {
                return;
            }
            if (rs.fusionMode() == 0) {
                this.replayNormal(rs);
            } else {
                this.replayFused(rs);
            }
        }

        @Override
        @Nullable
        public Throwable getError() {
            return this.error;
        }

        @Override
        public boolean isDone() {
            return this.done;
        }

        @Override
        @Nullable
        public T poll(ReplaySubscription<T> rs) {
            Node next;
            Node<T> node = (Node<T>)rs.node();
            if (node == null) {
                node = this.head;
                rs.node(node);
            }
            if ((next = (Node)node.get()) == null) {
                return null;
            }
            rs.node(next);
            return next.value;
        }

        @Override
        public void clear(ReplaySubscription<T> rs) {
            rs.node(null);
        }

        @Override
        public boolean isEmpty(ReplaySubscription<T> rs) {
            Node<T> node = (Node<T>)rs.node();
            if (node == null) {
                node = this.head;
                rs.node(node);
            }
            return node.get() == null;
        }

        @Override
        public int size(ReplaySubscription<T> rs) {
            Node next;
            int count;
            Node node = (Node)rs.node();
            if (node == null) {
                node = this.head;
            }
            for (count = 0; (next = (Node)node.get()) != null && count != Integer.MAX_VALUE; ++count) {
                node = next;
            }
            return count;
        }

        @Override
        public int size() {
            Node next;
            int count;
            Node node = this.head;
            for (count = 0; (next = (Node)node.get()) != null && count != Integer.MAX_VALUE; ++count) {
                node = next;
            }
            return count;
        }

        static final class Node<T>
        extends AtomicReference<Node<T>> {
            private static final long serialVersionUID = 3713592843205853725L;
            final T value;

            Node(@Nullable T value) {
                this.value = value;
            }
        }
    }

    static final class UnboundedReplayBuffer<T>
    implements ReplayBuffer<T> {
        final int batchSize;
        volatile int size;
        final Object[] head;
        Object[] tail;
        int tailIndex;
        volatile boolean done;
        Throwable error;

        UnboundedReplayBuffer(int batchSize) {
            this.batchSize = batchSize;
            Object[] n = new Object[batchSize + 1];
            this.tail = n;
            this.head = n;
        }

        @Override
        public boolean isExpired() {
            return false;
        }

        @Override
        @Nullable
        public Throwable getError() {
            return this.error;
        }

        @Override
        public int capacity() {
            return Integer.MAX_VALUE;
        }

        @Override
        public void add(T value) {
            int i = this.tailIndex;
            Object[] a = this.tail;
            if (i == a.length - 1) {
                Object[] b = new Object[a.length];
                b[0] = value;
                this.tailIndex = 1;
                a[i] = b;
                this.tail = b;
            } else {
                a[i] = value;
                this.tailIndex = i + 1;
            }
            ++this.size;
        }

        @Override
        public void onError(Throwable ex) {
            this.error = ex;
            this.done = true;
        }

        @Override
        public void onComplete() {
            this.done = true;
        }

        void replayNormal(ReplaySubscription<T> rs) {
            int missed = 1;
            CoreSubscriber<T> a = rs.actual();
            int n = this.batchSize;
            do {
                Throwable ex;
                boolean empty;
                boolean d;
                long r = rs.requested();
                long e = 0L;
                Object[] node = (Object[])rs.node();
                if (node == null) {
                    node = this.head;
                }
                int tailIndex = rs.tailIndex();
                int index = rs.index();
                while (e != r) {
                    if (rs.isCancelled()) {
                        rs.node(null);
                        return;
                    }
                    d = this.done;
                    boolean bl = empty = index == this.size;
                    if (d && empty) {
                        rs.node(null);
                        ex = this.error;
                        if (ex != null) {
                            a.onError(ex);
                        } else {
                            a.onComplete();
                        }
                        return;
                    }
                    if (empty) break;
                    if (tailIndex == n) {
                        node = (Object[])node[tailIndex];
                        tailIndex = 0;
                    }
                    Object v = node[tailIndex];
                    a.onNext(v);
                    ++e;
                    ++tailIndex;
                    ++index;
                }
                if (e == r) {
                    if (rs.isCancelled()) {
                        rs.node(null);
                        return;
                    }
                    d = this.done;
                    boolean bl = empty = index == this.size;
                    if (d && empty) {
                        rs.node(null);
                        ex = this.error;
                        if (ex != null) {
                            a.onError(ex);
                        } else {
                            a.onComplete();
                        }
                        return;
                    }
                }
                if (e != 0L && r != Long.MAX_VALUE) {
                    rs.produced(e);
                }
                rs.index(index);
                rs.tailIndex(tailIndex);
                rs.node(node);
            } while ((missed = rs.leave(missed)) != 0);
        }

        void replayFused(ReplaySubscription<T> rs) {
            int missed = 1;
            CoreSubscriber<T> a = rs.actual();
            do {
                if (rs.isCancelled()) {
                    rs.node(null);
                    return;
                }
                boolean d = this.done;
                a.onNext(null);
                if (!d) continue;
                Throwable ex = this.error;
                if (ex != null) {
                    a.onError(ex);
                } else {
                    a.onComplete();
                }
                return;
            } while ((missed = rs.leave(missed)) != 0);
        }

        @Override
        public void replay(ReplaySubscription<T> rs) {
            if (!rs.enter()) {
                return;
            }
            if (rs.fusionMode() == 0) {
                this.replayNormal(rs);
            } else {
                this.replayFused(rs);
            }
        }

        @Override
        public boolean isDone() {
            return this.done;
        }

        @Override
        @Nullable
        public T poll(ReplaySubscription<T> rs) {
            int tailIndex;
            int index = rs.index();
            if (index == this.size) {
                return null;
            }
            Object[] node = (Object[])rs.node();
            if (node == null) {
                node = this.head;
                rs.node(node);
            }
            if ((tailIndex = rs.tailIndex()) == this.batchSize) {
                node = (Object[])node[tailIndex];
                tailIndex = 0;
                rs.node(node);
            }
            Object v = node[tailIndex];
            rs.index(index + 1);
            rs.tailIndex(tailIndex + 1);
            return (T)v;
        }

        @Override
        public void clear(ReplaySubscription<T> rs) {
            rs.node(null);
        }

        @Override
        public boolean isEmpty(ReplaySubscription<T> rs) {
            return rs.index() == this.size;
        }

        @Override
        public int size(ReplaySubscription<T> rs) {
            return this.size - rs.index();
        }

        @Override
        public int size() {
            return this.size;
        }
    }

    static final class SizeAndTimeBoundReplayBuffer<T>
    implements ReplayBuffer<T> {
        final int limit;
        final long maxAge;
        final Scheduler scheduler;
        int size;
        volatile TimedNode<T> head;
        TimedNode<T> tail;
        Throwable error;
        static final long NOT_DONE = Long.MIN_VALUE;
        volatile long done = Long.MIN_VALUE;

        SizeAndTimeBoundReplayBuffer(int limit, long maxAge, Scheduler scheduler) {
            this.limit = limit;
            this.maxAge = maxAge;
            this.scheduler = scheduler;
            TimedNode<Object> h = new TimedNode<Object>(null, 0L);
            this.tail = h;
            this.head = h;
        }

        @Override
        public boolean isExpired() {
            long done = this.done;
            return done != Long.MIN_VALUE && this.scheduler.now(TimeUnit.MILLISECONDS) - this.maxAge > done;
        }

        void replayNormal(ReplaySubscription<T> rs) {
            int missed = 1;
            CoreSubscriber<T> a = rs.actual();
            do {
                boolean d;
                long e;
                TimedNode node;
                if ((node = (TimedNode)rs.node()) == null) {
                    node = this.head;
                    if (this.done == Long.MIN_VALUE) {
                        long ts;
                        long limit = this.scheduler.now(TimeUnit.MILLISECONDS) - this.maxAge;
                        TimedNode next = node;
                        while (next != null && (ts = next.time) <= limit) {
                            node = next;
                            next = (TimedNode)node.get();
                        }
                    }
                }
                long r = rs.requested();
                for (e = 0L; e != r; ++e) {
                    boolean empty;
                    if (rs.isCancelled()) {
                        rs.node(null);
                        return;
                    }
                    d = this.done != Long.MIN_VALUE;
                    TimedNode next = (TimedNode)node.get();
                    boolean bl = empty = next == null;
                    if (d && empty) {
                        rs.node(null);
                        Throwable ex = this.error;
                        if (ex != null) {
                            a.onError(ex);
                        } else {
                            a.onComplete();
                        }
                        return;
                    }
                    if (empty) break;
                    a.onNext(next.value);
                    node = next;
                }
                if (e == r) {
                    boolean empty;
                    if (rs.isCancelled()) {
                        rs.node(null);
                        return;
                    }
                    d = this.done != Long.MIN_VALUE;
                    boolean bl = empty = node.get() == null;
                    if (d && empty) {
                        rs.node(null);
                        Throwable ex = this.error;
                        if (ex != null) {
                            a.onError(ex);
                        } else {
                            a.onComplete();
                        }
                        return;
                    }
                }
                if (e != 0L && r != Long.MAX_VALUE) {
                    rs.produced(e);
                }
                rs.node(node);
            } while ((missed = rs.leave(missed)) != 0);
        }

        void replayFused(ReplaySubscription<T> rs) {
            int missed = 1;
            CoreSubscriber<T> a = rs.actual();
            do {
                if (rs.isCancelled()) {
                    rs.node(null);
                    return;
                }
                boolean d = this.done != Long.MIN_VALUE;
                a.onNext(null);
                if (!d) continue;
                Throwable ex = this.error;
                if (ex != null) {
                    a.onError(ex);
                } else {
                    a.onComplete();
                }
                return;
            } while ((missed = rs.leave(missed)) != 0);
        }

        @Override
        public void onError(Throwable ex) {
            this.done = this.scheduler.now(TimeUnit.MILLISECONDS);
            this.error = ex;
        }

        @Override
        @Nullable
        public Throwable getError() {
            return this.error;
        }

        @Override
        public void onComplete() {
            this.done = this.scheduler.now(TimeUnit.MILLISECONDS);
        }

        @Override
        public boolean isDone() {
            return this.done != Long.MIN_VALUE;
        }

        TimedNode<T> latestHead(ReplaySubscription<T> rs) {
            TimedNode n;
            long now = this.scheduler.now(TimeUnit.MILLISECONDS) - this.maxAge;
            TimedNode h = (TimedNode)rs.node();
            if (h == null) {
                h = this.head;
            }
            while ((n = (TimedNode)h.get()) != null && n.time <= now) {
                h = n;
            }
            return h;
        }

        @Override
        @Nullable
        public T poll(ReplaySubscription<T> rs) {
            TimedNode next;
            TimedNode node = this.latestHead(rs);
            long now = this.scheduler.now(TimeUnit.MILLISECONDS) - this.maxAge;
            while ((next = (TimedNode)node.get()) != null) {
                if (next.time > now) {
                    node = next;
                    break;
                }
                node = next;
            }
            if (next == null) {
                return null;
            }
            rs.node(next);
            return node.value;
        }

        @Override
        public void clear(ReplaySubscription<T> rs) {
            rs.node(null);
        }

        @Override
        public boolean isEmpty(ReplaySubscription<T> rs) {
            TimedNode<T> node = this.latestHead(rs);
            return node.get() == null;
        }

        @Override
        public int size(ReplaySubscription<T> rs) {
            TimedNode next;
            int count;
            TimedNode node = this.latestHead(rs);
            for (count = 0; (next = (TimedNode)node.get()) != null && count != Integer.MAX_VALUE; ++count) {
                node = next;
            }
            return count;
        }

        @Override
        public int size() {
            TimedNode next;
            int count;
            TimedNode node = this.head;
            for (count = 0; (next = (TimedNode)node.get()) != null && count != Integer.MAX_VALUE; ++count) {
                node = next;
            }
            return count;
        }

        @Override
        public int capacity() {
            return this.limit;
        }

        @Override
        public void add(T value) {
            TimedNode next;
            TimedNode<T> n = new TimedNode<T>(value, this.scheduler.now(TimeUnit.MILLISECONDS));
            this.tail.set(n);
            this.tail = n;
            int s = this.size;
            if (s == this.limit) {
                this.head = (TimedNode)this.head.get();
            } else {
                this.size = s + 1;
            }
            long limit = this.scheduler.now(TimeUnit.MILLISECONDS) - this.maxAge;
            TimedNode h = this.head;
            int removed = 0;
            while ((next = (TimedNode)h.get()) != null) {
                if (next.time > limit) {
                    if (removed == 0) break;
                    this.size -= removed;
                    this.head = h;
                    break;
                }
                h = next;
                ++removed;
            }
        }

        @Override
        public void replay(ReplaySubscription<T> rs) {
            if (!rs.enter()) {
                return;
            }
            if (rs.fusionMode() == 0) {
                this.replayNormal(rs);
            } else {
                this.replayFused(rs);
            }
        }

        static final class TimedNode<T>
        extends AtomicReference<TimedNode<T>> {
            final T value;
            final long time;

            TimedNode(@Nullable T value, long time) {
                this.value = value;
                this.time = time;
            }
        }
    }

    static interface ReplayBuffer<T> {
        public void add(T var1);

        public void onError(Throwable var1);

        @Nullable
        public Throwable getError();

        public void onComplete();

        public void replay(ReplaySubscription<T> var1);

        public boolean isDone();

        @Nullable
        public T poll(ReplaySubscription<T> var1);

        public void clear(ReplaySubscription<T> var1);

        public boolean isEmpty(ReplaySubscription<T> var1);

        public int size(ReplaySubscription<T> var1);

        public int size();

        public int capacity();

        public boolean isExpired();
    }

    static interface ReplaySubscription<T>
    extends Fuseable.QueueSubscription<T>,
    InnerProducer<T> {
        @Override
        public CoreSubscriber<? super T> actual();

        public boolean enter();

        public int leave(int var1);

        public void produced(long var1);

        public void node(@Nullable Object var1);

        @Nullable
        public Object node();

        public int tailIndex();

        public void tailIndex(int var1);

        public int index();

        public void index(int var1);

        public int fusionMode();

        public boolean isCancelled();

        public long requested();
    }
}

