/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.device.session;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import lombok.Generated;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.session.DeviceSessionEvent;
import org.jetlinks.core.device.session.DeviceSessionInfo;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.server.session.ChildrenDeviceSession;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

public abstract class AbstractDeviceSessionManager
implements DeviceSessionManager {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractDeviceSessionManager.class);
    private static final AtomicLongFieldUpdater<AbstractDeviceSessionManager> CLOSE_WIP = AtomicLongFieldUpdater.newUpdater(AbstractDeviceSessionManager.class, "closeWip");
    protected final Map<String, DeviceSessionRef> localSessions = new NonBlockingHashMap(2048);
    private final List<Function<DeviceSessionEvent, Mono<Void>>> sessionEventHandlers = new CopyOnWriteArrayList<Function<DeviceSessionEvent, Mono<Void>>>();
    protected final Disposable.Composite disposable = Disposables.composite();
    private Duration sessionLoadTimeout = Duration.ofSeconds(5L);
    private Duration sessionCheckInterval = Duration.ofSeconds(30L);
    private Duration sessionCheckDelay = Duration.ofMinutes(2L);
    private int sessionCheckConcurrency = Integer.getInteger("jetlinks.session.check.concurrency", Runtime.getRuntime().availableProcessors() * 64);
    private int sessionCloseConcurrency = Integer.getInteger("jetlinks.session.close.concurrency", 3000);
    protected Sinks.Many<DeviceSession> closeSink = Reactors.createMany();
    private volatile long closeWip = 0L;

    public abstract String getCurrentServerId();

    protected abstract Mono<Boolean> initSessionConnection(DeviceSession var1);

    protected abstract Mono<Long> removeRemoteSession(String var1);

    protected abstract Mono<Long> getRemoteTotalSessions();

    protected abstract Mono<Boolean> remoteSessionIsAlive(String var1);

    protected abstract Mono<Boolean> checkRemoteSessionIsAlive(String var1);

    protected abstract Flux<DeviceSessionInfo> remoteSessions(String var1);

    public void init() {
        Scheduler scheduler = Schedulers.newSingle((String)"device-session-checker");
        this.disposable.add((Disposable)scheduler);
        this.disposable.add(Flux.interval((Duration)this.sessionCheckDelay, (Duration)this.sessionCheckInterval, (Scheduler)scheduler).onBackpressureDrop().concatMap(time -> this.executeInterval()).subscribe());
        this.disposable.add(this.closeSink.asFlux().bufferTimeout(1000, Duration.ofSeconds(1L)).onBackpressureBuffer().concatMap(flux -> Flux.fromIterable((Iterable)flux).filter(session -> !this.localSessions.containsKey(session.getDeviceId())).flatMap(this::closeSessionSafe).then(), 0).subscribe());
    }

    protected Mono<Void> executeInterval() {
        return this.checkSession().onErrorResume(err -> Mono.empty());
    }

    public void shutdown() {
        this.disposable.dispose();
    }

    public boolean isShutdown() {
        return this.disposable.isDisposed();
    }

    public Mono<DeviceSession> getSession(String deviceId) {
        return this.getSession(deviceId, true);
    }

    public Mono<DeviceSession> getSession(String deviceId, boolean unregisterWhenNotAlive) {
        if (ObjectUtils.isEmpty((Object)deviceId)) {
            return Mono.empty();
        }
        DeviceSessionRef ref = this.localSessions.get(deviceId);
        if (ref == null || ref.isDisposed()) {
            return Mono.empty();
        }
        if (unregisterWhenNotAlive) {
            return ref.checkSessionAlive();
        }
        return ref.ref();
    }

    public Flux<DeviceSession> getSessions() {
        return Flux.fromIterable(this.localSessions.values()).flatMap(DeviceSessionRef::ref);
    }

    private Mono<DeviceSession> checkSessionAlive(String id) {
        DeviceSessionRef ref = this.localSessions.get(id);
        if (ref == null) {
            return Mono.empty();
        }
        return ref.checkSessionAlive();
    }

    public final Mono<Long> remove(String deviceId, boolean onlyLocal) {
        if (onlyLocal) {
            return this.removeLocalSession(deviceId);
        }
        return Flux.concat((Publisher[])new Publisher[]{this.removeLocalSession(deviceId), this.removeRemoteSession(deviceId)}).reduce(Math::addExact);
    }

    public Mono<Long> remove(String deviceId, Predicate<DeviceSession> predicate) {
        DeviceSessionRef _ref = this.localSessions.get(deviceId);
        return _ref == null ? Reactors.ALWAYS_ZERO_LONG : _ref.close(predicate);
    }

    public final Mono<Boolean> isAlive(String deviceId, boolean onlyLocal) {
        Mono localAlive = this.getSession(deviceId).hasElement();
        if (onlyLocal) {
            return localAlive;
        }
        return localAlive.flatMap(alive -> {
            if (alive.booleanValue()) {
                return Reactors.ALWAYS_TRUE;
            }
            return this.remoteSessionIsAlive(deviceId);
        });
    }

    public Mono<Boolean> checkAlive(String deviceId, boolean onlyLocal) {
        Mono<Boolean> localAlive = this.checkLocalAlive(deviceId);
        if (onlyLocal) {
            return localAlive;
        }
        return localAlive.flatMap(alive -> {
            if (alive.booleanValue()) {
                return Reactors.ALWAYS_TRUE;
            }
            return this.checkRemoteSessionIsAlive(deviceId);
        });
    }

    protected final Mono<Boolean> checkLocalAlive(String deviceId) {
        return this.getSession(deviceId).flatMap(session -> session.getOperator() == null ? Reactors.ALWAYS_FALSE : this.syncConnectionInfo(session.getOperator(), (DeviceSession)session)).defaultIfEmpty((Object)false);
    }

    protected final Mono<Boolean> syncConnectionInfo(DeviceOperator device, DeviceSession session) {
        return device.online(this.getCurrentServerId(), session.getClientAddress().map(String::valueOf).orElse(""), -1L).thenReturn((Object)true);
    }

    public final Mono<Long> totalSessions(boolean onlyLocal) {
        Mono total = Mono.just((Object)this.localSessions.size());
        if (onlyLocal) {
            return total;
        }
        return Mono.zip((Mono)total, this.getRemoteTotalSessions(), Math::addExact);
    }

    public final Flux<DeviceSessionInfo> getSessionInfo() {
        return Flux.concat((Publisher[])new Publisher[]{this.getLocalSessionInfo(), this.remoteSessions(null)});
    }

    public final Flux<DeviceSessionInfo> getSessionInfo(String serverId) {
        if (this.getCurrentServerId().equals(serverId)) {
            return this.getLocalSessionInfo();
        }
        return this.remoteSessions(serverId);
    }

    public final Flux<DeviceSessionInfo> getLocalSessionInfo() {
        return Flux.fromIterable(this.localSessions.values()).mapNotNull(ref -> ref.loaded).map(session -> DeviceSessionInfo.of((String)this.getCurrentServerId(), (DeviceSession)session));
    }

    public Mono<DeviceSession> compute(@Nonnull String deviceId, Mono<DeviceSession> creator, Function<DeviceSession, Mono<DeviceSession>> updater) {
        DeviceSessionRef ref = this.localSessions.compute(deviceId, (? super K _id, ? super V old) -> {
            if (old == null) {
                if (creator == null) {
                    return null;
                }
                return this.newDeviceSessionRef((String)_id, this, creator);
            }
            if (updater == null) {
                return old;
            }
            old.update(s -> s.flatMap(updater));
            return old;
        });
        return ref == null ? Mono.empty() : ref.ref();
    }

    public final Mono<DeviceSession> compute(@Nonnull String deviceId, @Nonnull Function<Mono<DeviceSession>, Mono<DeviceSession>> computer) {
        return this.localSessions.compute(deviceId, (? super K _id, ? super V old) -> {
            if (old != null) {
                old.update(computer);
                return old;
            }
            return this.newDeviceSessionRef((String)_id, this, (Mono<DeviceSession>)((Mono)computer.apply(Mono.empty())));
        }).ref();
    }

    private Mono<DeviceSession> handleSessionCompute0(DeviceSession old, DeviceSession newSession) {
        if (old != null && old.isChanged(newSession) && newSession.getOperator() != null) {
            log.info("device [{}] session [{}] changed to [{}]", new Object[]{old.getDeviceId(), old, newSession});
            old.close();
            return newSession.getOperator().online(this.getCurrentServerId(), (String)newSession.getClientAddress().map(InetSocketAddress::toString).orElse(null), -1L).then(this.handleSessionCompute(old, newSession));
        }
        return this.handleSessionCompute(old, newSession);
    }

    protected Mono<DeviceSession> handleSessionCompute(DeviceSession old, DeviceSession newSession) {
        return Mono.just((Object)newSession);
    }

    protected final Mono<Void> closeSessionSafe(DeviceSession session) {
        return this.closeSession0(session).onErrorResume(err -> {
            log.warn("close session [{}] error", (Object)session.getDeviceId(), err);
            return Mono.empty();
        });
    }

    private Mono<Void> closeSession0(DeviceSession session) {
        long now = System.currentTimeMillis();
        try {
            session.close();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        if (session.getOperator() == null || this.isShutdown()) {
            CLOSE_WIP.decrementAndGet(this);
            return Mono.empty();
        }
        return this.initSessionConnection(session).flatMap(alive -> {
            boolean sessionExists;
            boolean bl = sessionExists = alive != false || this.localSessions.containsKey(session.getDeviceId());
            if (sessionExists) {
                log.info("device [{}] session [{}] closed,but session still exists!", (Object)session.getDeviceId(), (Object)session);
                return this.fireEvent(DeviceSessionEvent.of((long)now, (DeviceSessionEvent.Type)DeviceSessionEvent.Type.unregister, (DeviceSession)session, (boolean)true));
            }
            log.info("device [{}] session [{}] closed", (Object)session.getDeviceId(), (Object)session);
            return session.getOperator().offline().then(this.fireEvent(DeviceSessionEvent.of((long)now, (DeviceSessionEvent.Type)DeviceSessionEvent.Type.unregister, (DeviceSession)session, (boolean)false)));
        }).doAfterTerminate(() -> CLOSE_WIP.decrementAndGet(this));
    }

    protected long getCloseWip() {
        return CLOSE_WIP.get(this);
    }

    protected final Mono<Void> closeSession(DeviceSession session) {
        if (CLOSE_WIP.incrementAndGet(this) > (long)this.sessionCloseConcurrency && this.closeSink.tryEmitNext((Object)session).isSuccess()) {
            return Mono.empty();
        }
        return this.closeSession0(session);
    }

    protected final Mono<Long> removeLocalSession(String deviceId) {
        return Mono.deferContextual(ctx -> {
            DeviceSessionRef inRef = ctx.getOrEmpty(DeviceSessionRef.class).orElse(null);
            DeviceSessionRef ref = this.localSessions.get(deviceId);
            if (ref != null) {
                if (inRef == ref) {
                    return Reactors.ALWAYS_ZERO_LONG;
                }
                return ref.close();
            }
            return Reactors.ALWAYS_ZERO_LONG;
        });
    }

    private Mono<DeviceSession> doRegister(DeviceSession session) {
        if (session.getOperator() == null) {
            return Mono.empty();
        }
        return this.remoteSessionIsAlive(session.getDeviceId()).flatMap(alive -> session.getOperator().online(this.getCurrentServerId(), (String)session.getClientAddress().map(InetSocketAddress::toString).orElse(null), alive != false ? -1L : session.connectTime()).then(this.fireEvent(DeviceSessionEvent.of((long)session.connectTime(), (DeviceSessionEvent.Type)DeviceSessionEvent.Type.register, (DeviceSession)session, (boolean)alive)))).thenReturn((Object)session);
    }

    protected Mono<Void> fireEvent(DeviceSessionEvent event) {
        if (this.sessionEventHandlers.isEmpty()) {
            return Mono.empty();
        }
        return Flux.fromIterable(this.sessionEventHandlers).flatMap(handler -> Mono.defer(() -> (Mono)handler.apply(event)).onErrorResume(err -> {
            log.error("fire session event error {}", (Object)event, err);
            return Mono.empty();
        })).then();
    }

    protected Mono<Boolean> doInit(String deviceId) {
        DeviceOperator device;
        DeviceSession session;
        DeviceSessionRef ref = this.localSessions.get(deviceId);
        if (ref != null && (session = ref.loaded) != null && (device = ref.loaded.getOperator()) != null) {
            return device.online(this.getCurrentServerId(), session.getClientAddress().map(String::valueOf).orElse(""), -1L).thenReturn((Object)true);
        }
        return Mono.empty();
    }

    protected Mono<Long> removeFromCluster(String deviceId) {
        DeviceSessionRef ref = this.localSessions.remove(deviceId);
        if (ref != null) {
            ref.dispose();
            DeviceSession session = ref.loaded;
            if (session != null) {
                session.close();
                if (session.getOperator() == null) {
                    return Reactors.ALWAYS_ONE_LONG;
                }
                long now = System.currentTimeMillis();
                return session.getOperator().getConnectionServerId().map(this.getCurrentServerId()::equals).defaultIfEmpty((Object)false).flatMap(sameServer -> {
                    if (this.localSessions.containsKey(deviceId)) {
                        return Mono.empty();
                    }
                    Mono before = Mono.empty();
                    if (sameServer.booleanValue()) {
                        before = session.getOperator().offline().then();
                    }
                    return before.then(this.fireEvent(DeviceSessionEvent.of((long)now, (DeviceSessionEvent.Type)DeviceSessionEvent.Type.unregister, (DeviceSession)session, (sameServer == false ? 1 : 0) != 0)));
                }).thenReturn((Object)1L);
            }
        }
        return Reactors.ALWAYS_ZERO_LONG;
    }

    public Disposable listenEvent(Function<DeviceSessionEvent, Mono<Void>> handler) {
        this.sessionEventHandlers.add(handler);
        return () -> this.sessionEventHandlers.remove(handler);
    }

    protected Mono<Void> checkSession() {
        return Flux.fromIterable(this.localSessions.values()).filter(ref -> ref.loaded != null).flatMap(ref -> ref.checkSessionAlive().onErrorResume(err -> {
            log.warn("check session alive error", err);
            return Mono.empty();
        }), this.sessionCheckConcurrency).then();
    }

    protected DeviceSessionRef newDeviceSessionRef(String deviceId, AbstractDeviceSessionManager manager, Mono<DeviceSession> ref) {
        return new DeviceSessionRef(deviceId, manager, ref);
    }

    protected DeviceSessionRef newDeviceSessionRef(String deviceId, AbstractDeviceSessionManager manager, DeviceSession ref) {
        return new DeviceSessionRef(deviceId, manager, ref);
    }

    @Generated
    public void setSessionLoadTimeout(Duration sessionLoadTimeout) {
        this.sessionLoadTimeout = sessionLoadTimeout;
    }

    @Generated
    public Duration getSessionLoadTimeout() {
        return this.sessionLoadTimeout;
    }

    @Generated
    public Duration getSessionCheckInterval() {
        return this.sessionCheckInterval;
    }

    @Generated
    public void setSessionCheckInterval(Duration sessionCheckInterval) {
        this.sessionCheckInterval = sessionCheckInterval;
    }

    @Generated
    public Duration getSessionCheckDelay() {
        return this.sessionCheckDelay;
    }

    @Generated
    public void setSessionCheckDelay(Duration sessionCheckDelay) {
        this.sessionCheckDelay = sessionCheckDelay;
    }

    @Generated
    public void setSessionCheckConcurrency(int sessionCheckConcurrency) {
        this.sessionCheckConcurrency = sessionCheckConcurrency;
    }

    @Generated
    public void setSessionCloseConcurrency(int sessionCloseConcurrency) {
        this.sessionCloseConcurrency = sessionCloseConcurrency;
    }

    protected static class DeviceSessionRef
    implements Disposable {
        private static final AtomicReferenceFieldUpdater<DeviceSessionRef, Mono> LOADER = AtomicReferenceFieldUpdater.newUpdater(DeviceSessionRef.class, Mono.class, "loader");
        protected volatile Mono<DeviceSession> loader;
        private static final AtomicReferenceFieldUpdater<DeviceSessionRef, Sinks.One> AWAIT = AtomicReferenceFieldUpdater.newUpdater(DeviceSessionRef.class, Sinks.One.class, "await");
        private volatile Sinks.One<DeviceSession> await;
        private final AbstractDeviceSessionManager manager;
        public final String deviceId;
        private static final AtomicReferenceFieldUpdater<DeviceSessionRef, DeviceSession> LOADED = AtomicReferenceFieldUpdater.newUpdater(DeviceSessionRef.class, DeviceSession.class, "loaded");
        public volatile DeviceSession loaded;
        private volatile Set<String> children;
        private final Disposable.Swap disposable = Disposables.swap();

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Set<String> children() {
            if (this.children != null) {
                return this.children;
            }
            DeviceSessionRef deviceSessionRef = this;
            synchronized (deviceSessionRef) {
                if (this.children != null) {
                    return this.children;
                }
                this.children = ConcurrentHashMap.newKeySet();
                return this.children;
            }
        }

        public void removeChild(String id) {
            if (this.children != null) {
                this.children.remove(id);
            }
        }

        public DeviceSessionRef(String deviceId, AbstractDeviceSessionManager manager, Mono<DeviceSession> ref) {
            this.deviceId = deviceId;
            this.manager = manager;
            this.update(o -> ref);
        }

        public DeviceSessionRef(String deviceId, AbstractDeviceSessionManager manager, DeviceSession ref) {
            this.deviceId = deviceId;
            this.manager = manager;
            this.loaded = ref;
        }

        private void handleParentChanged(DeviceSession from, DeviceSession to) {
            DeviceSessionRef fromRef = this.manager.localSessions.get(from.getDeviceId());
            DeviceSessionRef toRef = this.manager.localSessions.get(to.getDeviceId());
            if (null != fromRef) {
                fromRef.removeChild(this.deviceId);
            }
            if (null != toRef) {
                toRef.children().add(this.deviceId);
            }
        }

        private Mono<DeviceSession> handleLoaded(DeviceSession session) {
            if (this.isDisposed()) {
                DeviceSessionRef ref = this.manager.localSessions.get(this.deviceId);
                if (ref != null && ref != this) {
                    return ref.ref();
                }
                return Mono.just((Object)session);
            }
            DeviceSession old = LOADED.getAndSet(this, session);
            this.handleParent(session, (s, parent) -> parent.children().add(s.getDeviceId()));
            if (session.isWrapFrom(ChildrenDeviceSession.class)) {
                ((ChildrenDeviceSession)session.unwrap(ChildrenDeviceSession.class)).doOnParentChanged(this::handleParentChanged);
            }
            if (old == null) {
                return this.manager.doRegister(session).then(this.manager.handleSessionCompute0(null, session));
            }
            return this.manager.handleSessionCompute0(old, session);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected Mono<Long> close(Predicate<DeviceSession> test) {
            DeviceSessionRef deviceSessionRef = this;
            synchronized (deviceSessionRef) {
                DeviceSession loaded = this.loaded;
                if (loaded != null && test.test(loaded)) {
                    return this.close();
                }
                return Reactors.ALWAYS_ZERO_LONG;
            }
        }

        private void afterLoaded(DeviceSession session) {
            DeviceSession loaded = this.loaded;
            if (loaded != null && !session.equals(loaded)) {
                loaded.close();
            }
            LOADED.set(this, session);
            Sinks.One await = AWAIT.getAndSet(this, null);
            if (await != null) {
                await.emitValue((Object)session, Reactors.emitFailureHandler());
            }
        }

        protected void handleParent(DeviceSession session, BiConsumer<DeviceSession, DeviceSessionRef> parent) {
            DeviceSessionRef ref;
            if (session != null && session.isWrapFrom(ChildrenDeviceSession.class) && null != (ref = this.manager.localSessions.get(((ChildrenDeviceSession)session.unwrap(ChildrenDeviceSession.class)).getParent().getDeviceId()))) {
                parent.accept(session, ref);
            }
        }

        protected Mono<Void> checkChildren() {
            if (this.children != null) {
                return Flux.fromIterable(this.children).flatMap(this.manager::checkSessionAlive).then();
            }
            return Mono.empty();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private Mono<Long> close() {
            try {
                DeviceSession loaded;
                if (this.isDisposed()) {
                    Mono mono = Reactors.ALWAYS_ZERO_LONG;
                    return mono;
                }
                boolean removed = this.manager.localSessions.remove(this.deviceId, this);
                if (removed && (loaded = (DeviceSession)LOADED.getAndSet(this, null)) != null) {
                    Mono<Long> mono = this.doClose(loaded);
                    return mono;
                }
                Mono mono = Reactors.ALWAYS_ZERO_LONG;
                return mono;
            }
            finally {
                this.dispose();
            }
        }

        private Mono<Long> doClose(DeviceSession session) {
            this.handleParent(session, (s, ref) -> ref.removeChild(s.getDeviceId()));
            return this.manager.closeSession(session).then(this.checkChildren()).then(Reactors.ALWAYS_ONE_LONG);
        }

        private void loadError(Throwable err) {
            if (this.loaded != null) {
                this.loaded.close();
            }
            this.manager.localSessions.remove(this.deviceId, this);
            Sinks.One await = AWAIT.getAndSet(this, null);
            if (await != null) {
                await.emitError(err, Reactors.emitFailureHandler());
            } else {
                log.error("load device [{}] session error", (Object)this.deviceId, (Object)err);
            }
        }

        private void loadEmpty() {
            if (this.loaded != null) {
                this.loaded.close();
            }
            this.manager.localSessions.remove(this.deviceId, this);
            Sinks.One await = AWAIT.getAndSet(this, null);
            if (await != null) {
                await.emitEmpty(Reactors.emitFailureHandler());
            }
            this.dispose();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void update(Function<Mono<DeviceSession>, Mono<DeviceSession>> updater) {
            DeviceSessionRef deviceSessionRef = this;
            synchronized (deviceSessionRef) {
                Sinks.One await;
                Mono<DeviceSession> loader = LOADER.getAndSet(this, null);
                loader = loader != null ? updater.apply(loader) : ((await = AWAIT.get(this)) != null ? updater.apply((Mono<DeviceSession>)await.asMono()) : updater.apply((Mono<DeviceSession>)Mono.fromSupplier(() -> this.loaded)));
                AWAIT.compareAndSet(this, null, Sinks.one());
                LOADER.set(this, loader);
            }
        }

        private Mono<DeviceSession> tryLoad(ContextView ctx) {
            Mono loader = LOADER.getAndSet(this, null);
            if (loader != null) {
                Sinks.One async = Sinks.one();
                loader.flatMap(this::handleLoaded).switchIfEmpty(Mono.fromRunnable(this::loadEmpty)).timeout(this.manager.sessionLoadTimeout, Mono.error(() -> new TimeoutException("device [" + this.deviceId + "] session load timeout"))).subscribe(loaded -> {
                    this.afterLoaded((DeviceSession)loaded);
                    async.emitValue((Object)LOADED.get(this), Reactors.emitFailureHandler());
                }, err -> {
                    this.loadError((Throwable)err);
                    async.emitError(err, Reactors.emitFailureHandler());
                }, () -> async.emitEmpty(Reactors.emitFailureHandler()), Context.of((ContextView)ctx).put(DeviceSessionRef.class, (Object)this));
                return async.asMono();
            }
            Sinks.One sink = AWAIT.get(this);
            if (sink == null) {
                return Mono.fromSupplier(() -> this.loaded);
            }
            return sink.asMono();
        }

        public Mono<DeviceSession> checkSessionAlive() {
            if (this.isDisposed()) {
                return Mono.empty();
            }
            DeviceSession session = this.loaded;
            if (session != null) {
                return session.isAliveAsync().flatMap(alive -> {
                    if (!alive.booleanValue()) {
                        return this.close().then(Mono.empty());
                    }
                    return Mono.just((Object)session);
                });
            }
            return this.ref().flatMap(ignore -> this.checkSessionAlive());
        }

        public Mono<DeviceSession> ref() {
            return Mono.deferContextual(ctx -> {
                if (ctx.getOrEmpty(DeviceSessionRef.class).orElse(null) == this) {
                    return Mono.fromSupplier(() -> {
                        DeviceSession loaded = this.loaded;
                        if (loaded == null) {
                            log.warn("recursive call get device session [{}]", (Object)this.deviceId);
                        }
                        return loaded;
                    });
                }
                return this.tryLoad((ContextView)ctx);
            });
        }

        public void dispose() {
            Sinks.One await = AWAIT.getAndSet(this, null);
            if (await != null) {
                await.emitEmpty(Reactors.emitFailureHandler());
            }
            this.disposable.dispose();
        }

        public boolean isDisposed() {
            return this.disposable.isDisposed();
        }
    }
}

