/*
 * Decompiled with CFR 0.152.
 */
package org.davidmoten.rx.pool;

import com.github.davidmoten.guavamini.Preconditions;
import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.fuseable.SimplePlainQueue;
import io.reactivex.internal.queue.MpscLinkedQueue;
import io.reactivex.internal.util.EmptyComponent;
import io.reactivex.plugins.RxJavaPlugins;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.davidmoten.rx.internal.LifoQueue;
import org.davidmoten.rx.pool.Checkin;
import org.davidmoten.rx.pool.DecoratingMember;
import org.davidmoten.rx.pool.Member;
import org.davidmoten.rx.pool.NonBlockingPool;
import org.davidmoten.rx.pool.PoolClosedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class MemberSingle<T>
extends Single<Member<T>>
implements Closeable {
    final Observers<T> observers;
    private static final Logger log = LoggerFactory.getLogger(MemberSingle.class);
    private final MemberSingleObserver<T> removeAll;
    private final LifoQueue<DecoratingMember<T>> initializedAvailable;
    private final SimplePlainQueue<DecoratingMember<T>> notInitialized;
    private final SimplePlainQueue<DecoratingMember<T>> toBeReleased;
    private final SimplePlainQueue<DecoratingMember<T>> toBeChecked;
    private final SimplePlainQueue<MemberSingleObserver<T>> toBeAdded;
    private final SimplePlainQueue<MemberSingleObserver<T>> toBeRemoved;
    private final AtomicInteger wip = new AtomicInteger();
    private final DecoratingMember<T>[] members;
    private final Scheduler scheduler;
    private final long createRetryIntervalMs;
    private final CompositeDisposable scheduled = new CompositeDisposable();
    final NonBlockingPool<T> pool;
    private final AtomicLong initializeScheduled = new AtomicLong();
    private volatile boolean cancelled;

    MemberSingle(NonBlockingPool<T> pool) {
        Preconditions.checkNotNull(pool);
        this.notInitialized = new MpscLinkedQueue();
        this.initializedAvailable = new LifoQueue();
        this.toBeReleased = new MpscLinkedQueue();
        this.toBeChecked = new MpscLinkedQueue();
        this.toBeAdded = new MpscLinkedQueue();
        this.toBeRemoved = new MpscLinkedQueue();
        this.members = this.createMembersArray(pool.maxSize, pool.checkinDecorator);
        for (DecoratingMember<T> m : this.members) {
            this.notInitialized.offer(m);
        }
        this.scheduler = pool.scheduler;
        this.createRetryIntervalMs = pool.createRetryIntervalMs;
        this.observers = new Observers();
        this.pool = pool;
        this.removeAll = new MemberSingleObserver(EmptyComponent.INSTANCE, this);
    }

    private DecoratingMember<T>[] createMembersArray(int poolMaxSize, BiFunction<? super T, ? super Checkin, ? extends T> checkinDecorator) {
        DecoratingMember[] m = new DecoratingMember[poolMaxSize];
        for (int i = 0; i < m.length; ++i) {
            m[i] = new DecoratingMember<T>(null, checkinDecorator, this);
        }
        return m;
    }

    protected void subscribeActual(SingleObserver<? super Member<T>> observer) {
        log.debug("subscribeActual");
        MemberSingleObserver o = new MemberSingleObserver(observer, this);
        observer.onSubscribe(o);
        if (this.pool.isClosed()) {
            observer.onError((Throwable)new PoolClosedException());
            return;
        }
        this.toBeAdded.offer(o);
        this.drain();
    }

    public void checkin(Member<T> member) {
        this.checkin(member, false);
    }

    public void checkin(Member<T> member, boolean decrementInitializeScheduled) {
        log.debug("checking in {}", member);
        DecoratingMember d = (DecoratingMember)member;
        d.scheduleRelease();
        d.markAsChecked();
        this.initializedAvailable.offer((DecoratingMember)member);
        if (decrementInitializeScheduled) {
            this.initializeScheduled.decrementAndGet();
        }
        this.drain();
    }

    public void addToBeReleased(DecoratingMember<T> member) {
        this.toBeReleased.offer(member);
        this.drain();
    }

    public void cancel() {
        log.debug("cancel called");
        this.cancelled = true;
        this.disposeAll();
    }

    private void drain() {
        log.debug("drain called");
        if (this.wip.getAndIncrement() == 0) {
            log.debug("drain loop starting");
            int missed = 1;
            block0: do {
                this.removeObservers();
                this.addObservers();
                this.scheduleReleasesNoDelay();
                this.scheduleChecksNoDelay();
                Observers<T> obs = this.observers;
                log.debug("requested={}", (Object)obs.requested);
                long r = Math.min(obs.readyCount, obs.requested);
                long e = 0L;
                while (e != r && obs.readyCount > 0) {
                    if (this.cancelled) {
                        this.disposeAll();
                        return;
                    }
                    DecoratingMember<T> m = this.initializedAvailable.poll();
                    log.debug("poll of available members returns {}", m);
                    if (m == null) {
                        boolean used;
                        DecoratingMember m2 = (DecoratingMember)this.notInitialized.poll();
                        if (m2 == null || !(used = this.trySchedulingInitializationNoDelay(r, e, m2))) {
                            continue block0;
                        }
                    } else if (!m.isReleasing() && !m.isChecking()) {
                        log.debug("trying to emit member");
                        if (this.shouldPerformHealthCheck(m)) {
                            log.debug("queueing member for health check {}", m);
                            this.toBeChecked.offer(m);
                        } else {
                            log.debug("no health check required for {}", m);
                            this.emit(obs, m);
                            log.debug("emitted");
                            ++e;
                        }
                    }
                    this.removeObservers();
                    this.addObservers();
                    this.scheduleReleasesNoDelay();
                    this.scheduleChecksNoDelay();
                }
            } while ((missed = this.wip.addAndGet(-missed)) != 0);
            return;
        }
    }

    private void addObservers() {
        MemberSingleObserver o;
        while ((o = (MemberSingleObserver)this.toBeAdded.poll()) != null) {
            this.observers.add(o);
        }
    }

    private void removeObservers() {
        MemberSingleObserver o;
        while ((o = (MemberSingleObserver)this.toBeRemoved.poll()) != null) {
            if (o == this.removeAll) {
                this.observers.removeAll();
                return;
            }
            this.observers.remove(o);
        }
    }

    private boolean trySchedulingInitializationNoDelay(long r, long e, DecoratingMember<T> m) {
        long cs;
        while (e + (cs = this.initializeScheduled.get()) < r) {
            if (!this.initializeScheduled.compareAndSet(cs, cs + 1L)) continue;
            log.debug("scheduling member creation");
            this.scheduled.add(this.scheduler.scheduleDirect((Runnable)new Initializer(m)));
            return true;
        }
        log.debug("insufficient demand to initialize {}", m);
        this.notInitialized.offer(m);
        return false;
    }

    private boolean shouldPerformHealthCheck(DecoratingMember<T> m) {
        long now = this.scheduler.now(TimeUnit.MILLISECONDS);
        log.debug("schedule.now={}, lastCheck={}", (Object)now, (Object)m.lastCheckTime());
        return MemberSingle.shouldPerformHealthCheck(m, this.pool.idleTimeBeforeHealthCheckMs, now);
    }

    @VisibleForTesting
    static <T> boolean shouldPerformHealthCheck(DecoratingMember<T> m, long idleTimeBeforeHealthCheckMs, long now) {
        return idleTimeBeforeHealthCheckMs > 0L && now - m.lastCheckTime() >= idleTimeBeforeHealthCheckMs;
    }

    private void scheduleChecksNoDelay() {
        DecoratingMember m;
        while ((m = (DecoratingMember)this.toBeChecked.poll()) != null) {
            if (m.isReleasing()) continue;
            log.debug("scheduling check of {}", (Object)m);
            m.markAsChecking();
            this.scheduled.add(this.scheduler.scheduleDirect((Runnable)new Checker(m)));
        }
    }

    private void scheduleReleasesNoDelay() {
        DecoratingMember m;
        while ((m = (DecoratingMember)this.toBeReleased.poll()) != null) {
            log.debug("scheduling release of {}", (Object)m);
            m.markAsReleasing();
            this.scheduled.add(this.scheduler.scheduleDirect((Runnable)new Releaser(m)));
        }
    }

    private void emit(Observers<T> obs, DecoratingMember<T> m) {
        int index = obs.index;
        int nextIndex = (index + 1) % this.observers.ready.size();
        while (nextIndex != index && !this.observers.ready.get(nextIndex).booleanValue()) {
            nextIndex = (nextIndex + 1) % this.observers.ready.size();
        }
        this.observers.ready.set(nextIndex, Boolean.FALSE);
        --this.observers.readyCount;
        --this.observers.requested;
        MemberSingleObserver oNext = obs.observers.get(nextIndex);
        Scheduler.Worker worker = this.scheduler.createWorker();
        worker.schedule(new Emitter(worker, oNext, m));
    }

    @Override
    public void close() {
        this.cancel();
    }

    private void disposeAll() {
        this.initializedAvailable.clear();
        this.toBeReleased.clear();
        this.notInitialized.clear();
        this.disposeValues();
        this.removeAllObservers();
    }

    private void disposeValues() {
        this.scheduled.dispose();
        for (DecoratingMember<T> member : this.members) {
            member.disposeValue();
        }
    }

    private void removeAllObservers() {
        this.toBeRemoved.offer(this.removeAll);
        this.drain();
    }

    void remove(@NonNull MemberSingleObserver<T> inner) {
        this.toBeRemoved.offer(inner);
        this.drain();
    }

    public void release(DecoratingMember<T> m) {
        log.debug("adding released member to notInitialized queue {}", m);
        this.notInitialized.offer(m);
        this.drain();
    }

    static final class MemberSingleObserver<T>
    extends AtomicReference<MemberSingle<T>>
    implements Disposable {
        private static final long serialVersionUID = -7650903191002190468L;
        final SingleObserver<? super Member<T>> child;

        MemberSingleObserver(SingleObserver<? super Member<T>> child, MemberSingle<T> parent) {
            this.child = child;
            this.lazySet(parent);
        }

        public void dispose() {
            MemberSingle parent = this.getAndSet(null);
            if (parent != null) {
                parent.remove(this);
            }
        }

        public boolean isDisposed() {
            return this.get() == null;
        }
    }

    private static final class Emitter<T>
    implements Runnable {
        private final Scheduler.Worker worker;
        private final MemberSingleObserver<T> observer;
        private final Member<T> m;

        Emitter(Scheduler.Worker worker, MemberSingleObserver<T> observer, Member<T> m) {
            this.worker = worker;
            this.observer = observer;
            this.m = m;
        }

        @Override
        public void run() {
            this.worker.dispose();
            try {
                this.observer.child.onSuccess(this.m);
            }
            catch (Throwable e) {
                RxJavaPlugins.onError((Throwable)e);
            }
            finally {
                this.observer.dispose();
            }
        }
    }

    private static final class Observers<T> {
        final List<MemberSingleObserver<T>> observers = new ArrayList<MemberSingleObserver<T>>();
        final List<Boolean> ready = new ArrayList<Boolean>();
        int readyCount = 0;
        int index = 0;
        int requested = 0;

        Observers() {
        }

        void add(MemberSingleObserver<T> o) {
            this.observers.add(o);
            this.ready.add(Boolean.TRUE);
            ++this.readyCount;
            ++this.requested;
        }

        void remove(MemberSingleObserver<T> o) {
            int i = this.observers.indexOf(o);
            if (i == -1) {
                return;
            }
            int n = this.readyCount = this.ready.get(i) != false ? this.readyCount - 1 : this.readyCount;
            if (this.index >= i && this.index > 0) {
                --this.index;
            }
            this.observers.remove(i);
            this.ready.remove(i);
        }

        void removeAll() {
            this.observers.clear();
            this.ready.clear();
            this.readyCount = 0;
            this.index = 0;
        }
    }

    final class Checker
    implements Runnable {
        private final DecoratingMember<T> m;

        Checker(DecoratingMember<T> m) {
            this.m = m;
        }

        @Override
        public void run() {
            try {
                log.debug("performing health check on {}", this.m);
                if (!MemberSingle.this.pool.healthCheck.test(this.m.value())) {
                    log.debug("failed health check");
                    this.m.disposeValue();
                    log.debug("scheduling recreation of member {}", this.m);
                    MemberSingle.this.scheduled.add(MemberSingle.this.scheduler.scheduleDirect(() -> {
                        log.debug("recreating member after failed health check {}", this.m);
                        MemberSingle.this.notInitialized.offer(this.m);
                        MemberSingle.this.drain();
                    }, MemberSingle.this.pool.createRetryIntervalMs, TimeUnit.MILLISECONDS));
                } else {
                    this.m.markAsChecked();
                    MemberSingle.this.initializedAvailable.offer(this.m);
                    MemberSingle.this.drain();
                }
            }
            catch (Throwable t) {
                RxJavaPlugins.onError((Throwable)t);
            }
        }
    }

    final class Releaser
    implements Runnable {
        private DecoratingMember<T> m;

        Releaser(DecoratingMember<T> m) {
            this.m = m;
        }

        @Override
        public void run() {
            try {
                this.m.disposeValue();
                MemberSingle.this.release(this.m);
            }
            catch (Throwable t) {
                RxJavaPlugins.onError((Throwable)t);
            }
        }
    }

    @VisibleForTesting
    final class Initializer
    implements Runnable {
        private final DecoratingMember<T> m;

        Initializer(DecoratingMember<T> m) {
            this.m = m;
        }

        @Override
        public void run() {
            block3: {
                if (!MemberSingle.this.cancelled) {
                    try {
                        log.debug("creating value");
                        Object value = MemberSingle.this.pool.factory.call();
                        this.m.setValueAndClearReleasingFlag(value);
                        MemberSingle.this.checkin(this.m, true);
                    }
                    catch (Throwable t) {
                        RxJavaPlugins.onError((Throwable)t);
                        if (MemberSingle.this.cancelled) break block3;
                        MemberSingle.this.scheduled.add(MemberSingle.this.scheduler.scheduleDirect((Runnable)this, MemberSingle.this.createRetryIntervalMs, TimeUnit.MILLISECONDS));
                    }
                }
            }
        }
    }
}

