/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.device.session;

import com.google.common.collect.Maps;
import io.scalecube.services.annotations.ServiceMethod;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import lombok.Generated;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.device.session.DeviceSessionInfo;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.rpc.ServiceEvent;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.PersistentSession;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.supports.device.session.AbstractDeviceSessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

public class ClusterDeviceSessionManager
extends AbstractDeviceSessionManager {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ClusterDeviceSessionManager.class);
    private final RpcManager rpcManager;
    private final Map<String, Service> services = new NonBlockingHashMap();
    private Duration syncInterval = Duration.ZERO;
    private Duration syncDelay = Duration.ofMinutes(1L);

    public ClusterDeviceSessionManager(RpcManager rpcManager) {
        this.rpcManager = rpcManager;
    }

    @Override
    public void init() {
        super.init();
        this.disposable.add(this.rpcManager.registerService((Object)new ServiceImpl(() -> this)));
        this.rpcManager.getServices(Service.class).subscribe(service -> this.addService(service.serverNodeId(), (Service)service.service()));
        this.rpcManager.listen(Service.class).subscribe(e -> {
            if (e.getType() == ServiceEvent.Type.removed) {
                this.services.remove(e.getServerNodeId());
            } else if (e.getType() == ServiceEvent.Type.added) {
                this.rpcManager.getService(e.getServerNodeId(), Service.class).subscribe(service -> this.addService(e.getServerNodeId(), (Service)service));
            }
        });
        if (!this.syncInterval.isZero() && !this.syncInterval.isNegative()) {
            this.disposable.add(Flux.interval((Duration)this.syncDelay, (Duration)this.syncInterval).onBackpressureDrop().concatMap(times -> this.doSync().onErrorResume(err -> {
                log.warn("interval sync device session failed", err);
                return Mono.empty();
            })).subscribe());
        }
    }

    private Mono<Void> doSync(AbstractDeviceSessionManager.DeviceSessionRef session) {
        DeviceSession loaded = session.loaded;
        if (loaded == null) {
            return Mono.empty();
        }
        DeviceSessionInfo info = DeviceSessionInfo.of((String)this.rpcManager.currentServerId(), (DeviceSession)loaded);
        log.debug("sync device session {} {}", (Object)info.getDeviceId(), (Object)loaded);
        return this.getServices().flatMap(service -> service.sync(info)).then();
    }

    protected Mono<Void> doSync() {
        log.info("start sync device sessions");
        AtomicLong cnt = new AtomicLong();
        return Flux.fromIterable(this.localSessions.values()).filter(ref -> ref instanceof ClusterDeviceSessionRef && ((ClusterDeviceSessionRef)ref).needSync()).flatMap(ref -> {
            cnt.incrementAndGet();
            return this.doSync((AbstractDeviceSessionManager.DeviceSessionRef)ref).onErrorResume(err -> {
                log.warn("sync device {} session failed ", (Object)ref.deviceId, err);
                return Mono.empty();
            });
        }, 16).then(Mono.fromRunnable(() -> log.info("sync device sessions complete. sessions:{}", (Object)cnt)));
    }

    @Override
    protected ClusterDeviceSessionRef newDeviceSessionRef(String deviceId, AbstractDeviceSessionManager manager, Mono<DeviceSession> ref) {
        return new ClusterDeviceSessionRef(deviceId, manager, ref);
    }

    @Override
    protected ClusterDeviceSessionRef newDeviceSessionRef(String deviceId, AbstractDeviceSessionManager manager, DeviceSession ref) {
        return new ClusterDeviceSessionRef(deviceId, manager, ref);
    }

    @Override
    public boolean isShutdown() {
        return super.isShutdown() || this.rpcManager.isShutdown();
    }

    private void addService(String serverId, Service rpc) {
        this.services.put(serverId, new ErrorHandleService(serverId, rpc));
    }

    @Override
    public final String getCurrentServerId() {
        return this.rpcManager.currentServerId();
    }

    @Override
    protected final Mono<Boolean> initSessionConnection(DeviceSession session) {
        if (this.services.isEmpty()) {
            return Reactors.ALWAYS_FALSE;
        }
        return this.getServices().concatMap(service -> service.init(session.getDeviceId())).takeUntil(Boolean::booleanValue).any(Boolean::booleanValue);
    }

    @Override
    protected final Mono<Long> removeRemoteSession(String deviceId) {
        if (this.services.isEmpty()) {
            return Reactors.ALWAYS_ZERO_LONG;
        }
        return Mono.deferContextual(ctx -> {
            Service.SessionOperation operation = Service.SessionOperation.of(deviceId, ctx);
            return this.getServices().concatMap(service -> service.remove0(operation)).reduce(Math::addExact);
        });
    }

    @Override
    protected final Mono<Long> getRemoteTotalSessions() {
        if (this.services.isEmpty()) {
            return Reactors.ALWAYS_ZERO_LONG;
        }
        return this.getServices().flatMap(Service::total).reduce(Math::addExact);
    }

    @Override
    protected final Mono<Boolean> remoteSessionIsAlive(String deviceId) {
        if (this.services.isEmpty()) {
            return Reactors.ALWAYS_FALSE;
        }
        return this.getServices().flatMap(service -> service.isAlive(deviceId)).any(Boolean::booleanValue).defaultIfEmpty((Object)false);
    }

    @Override
    protected Mono<Boolean> checkRemoteSessionIsAlive(String deviceId) {
        if (this.services.isEmpty()) {
            return Reactors.ALWAYS_FALSE;
        }
        return this.getServices().flatMap(service -> service.checkAlive(deviceId)).any(Boolean::booleanValue).defaultIfEmpty((Object)false);
    }

    @Override
    protected Flux<DeviceSessionInfo> remoteSessions(String serverId) {
        if (ObjectUtils.isEmpty((Object)serverId)) {
            return this.getServices().flatMap(Service::sessions);
        }
        Service service = this.services.get(serverId);
        return service == null ? Flux.empty() : service.sessions();
    }

    private Flux<Service> getServices() {
        return Flux.fromIterable(this.services.values());
    }

    @Generated
    public Duration getSyncInterval() {
        return this.syncInterval;
    }

    @Generated
    public void setSyncInterval(Duration syncInterval) {
        this.syncInterval = syncInterval;
    }

    @Generated
    public Duration getSyncDelay() {
        return this.syncDelay;
    }

    @Generated
    public void setSyncDelay(Duration syncDelay) {
        this.syncDelay = syncDelay;
    }

    public static class ServiceImpl
    implements Service {
        private final Supplier<AbstractDeviceSessionManager> managerSupplier;

        private <T, Arg0> T doWith(Arg0 arg0, BiFunction<AbstractDeviceSessionManager, Arg0, T> arg, T defaultValue) {
            AbstractDeviceSessionManager manager = this.managerSupplier.get();
            if (manager == null) {
                return defaultValue;
            }
            return arg.apply(manager, arg0);
        }

        @Override
        public Mono<Long> remove0(Service.SessionOperation operation) {
            return this.doWith(operation.deviceId, AbstractDeviceSessionManager::removeFromCluster, Reactors.ALWAYS_ZERO_LONG).contextWrite((ContextView)operation.toContext());
        }

        @Override
        public Mono<Boolean> checkAlive(String deviceId) {
            return this.doWith(deviceId, (manager, id) -> manager.checkLocalAlive(deviceId), Reactors.ALWAYS_FALSE);
        }

        @Override
        public Mono<Boolean> isAlive(String deviceId) {
            return this.doWith(deviceId, (manager, id) -> {
                AbstractDeviceSessionManager.DeviceSessionRef ref = manager.localSessions.get(deviceId);
                if (ref == null) {
                    return Reactors.ALWAYS_FALSE;
                }
                if (ref.loaded == null) {
                    return Reactors.ALWAYS_TRUE;
                }
                return ref.loaded.isAliveAsync();
            }, Reactors.ALWAYS_FALSE);
        }

        @Override
        public Mono<Long> total() {
            return this.doWith(null, (manager, nil) -> manager.totalSessions(true), Reactors.ALWAYS_ZERO_LONG);
        }

        @Override
        public Mono<Boolean> init(String deviceId) {
            return this.doWith(deviceId, AbstractDeviceSessionManager::doInit, Reactors.ALWAYS_FALSE);
        }

        @Override
        public Mono<Long> remove(String deviceId) {
            return this.doWith(deviceId, AbstractDeviceSessionManager::removeFromCluster, Reactors.ALWAYS_ZERO_LONG);
        }

        @Override
        public Flux<DeviceSessionInfo> sessions() {
            return this.doWith(null, (manager, ignore) -> manager.getLocalSessionInfo(), Flux.empty());
        }

        @Override
        public Mono<Void> sync(DeviceSessionInfo info) {
            return this.doWith(info, (manager, session) -> {
                DeviceSession loaded;
                AbstractDeviceSessionManager.DeviceSessionRef ref = manager.localSessions.get(session.getDeviceId());
                if (ref != null && (loaded = ref.loaded) != null) {
                    loaded.keepAlive(Math.max(session.getLastCommTime(), loaded.lastPingTime()));
                }
                return Mono.empty();
            }, Mono.empty());
        }

        @Generated
        public ServiceImpl(Supplier<AbstractDeviceSessionManager> managerSupplier) {
            this.managerSupplier = managerSupplier;
        }
    }

    @io.scalecube.services.annotations.Service
    public static interface Service {
        @ServiceMethod
        public Mono<Boolean> isAlive(String var1);

        @ServiceMethod
        public Mono<Boolean> checkAlive(String var1);

        @ServiceMethod
        public Mono<Long> total();

        @ServiceMethod
        public Mono<Boolean> init(String var1);

        @ServiceMethod
        public Mono<Long> remove(String var1);

        @ServiceMethod
        public Mono<Long> remove0(SessionOperation var1);

        @ServiceMethod
        public Flux<DeviceSessionInfo> sessions();

        @ServiceMethod
        public Mono<Void> sync(DeviceSessionInfo var1);

        public static class SessionOperation
        implements Externalizable {
            private String deviceId;
            private Map<String, Object> context;

            public static SessionOperation of(String deviceId, ContextView ctx) {
                SessionOperation opt = new SessionOperation();
                opt.deviceId = deviceId;
                return opt.with(ctx);
            }

            @Override
            public void writeExternal(ObjectOutput out) throws IOException {
                out.writeUTF(this.deviceId);
                SerializeUtils.writeKeyValue(this.context, (ObjectOutput)out);
            }

            @Override
            public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
                this.deviceId = in.readUTF();
                this.context = SerializeUtils.readMap((ObjectInput)in, Maps::newHashMapWithExpectedSize);
            }

            public Context toContext() {
                if (this.context == null) {
                    return Context.empty();
                }
                Object msg = this.context.get("@msg");
                if (msg instanceof DeviceMessage) {
                    return Context.of(DeviceMessage.class, (Object)msg);
                }
                if (msg instanceof Map) {
                    return MessageType.convertMessage((Map)((Map)msg)).map(_msg -> Context.of(DeviceMessage.class, (Object)_msg)).orElse(Context.empty());
                }
                return Context.empty();
            }

            public SessionOperation with(ContextView view) {
                if (this.context == null) {
                    this.context = Maps.newHashMapWithExpectedSize((int)1);
                }
                view.getOrEmpty(DeviceMessage.class).ifPresent(msg -> this.context.put("@msg", msg));
                return this;
            }

            @Generated
            public String getDeviceId() {
                return this.deviceId;
            }

            @Generated
            public Map<String, Object> getContext() {
                return this.context;
            }

            @Generated
            public void setDeviceId(String deviceId) {
                this.deviceId = deviceId;
            }

            @Generated
            public void setContext(Map<String, Object> context) {
                this.context = context;
            }
        }
    }

    protected static class ClusterDeviceSessionRef
    extends AbstractDeviceSessionManager.DeviceSessionRef {
        static final AtomicLongFieldUpdater<ClusterDeviceSessionRef> LAST_SYNC = AtomicLongFieldUpdater.newUpdater(ClusterDeviceSessionRef.class, "lastSync");
        private volatile long lastSync;

        public ClusterDeviceSessionRef(String deviceId, AbstractDeviceSessionManager manager, Mono<DeviceSession> ref) {
            super(deviceId, manager, ref);
        }

        public ClusterDeviceSessionRef(String deviceId, AbstractDeviceSessionManager manager, DeviceSession ref) {
            super(deviceId, manager, ref);
        }

        public boolean needSync() {
            DeviceSession session = this.loaded;
            if (session == null) {
                return false;
            }
            long lastTime = session.lastPingTime();
            return session instanceof PersistentSession && LAST_SYNC.getAndSet(this, lastTime) != lastTime;
        }
    }

    static class ErrorHandleService
    implements Service {
        private final String id;
        private final Service service;
        private static final Mono<Boolean> defaultAlive = Boolean.getBoolean("jetlinks.session.cluster.alive-when-failed") ? Reactors.ALWAYS_TRUE : Reactors.ALWAYS_FALSE;

        private void handleError(Throwable error) {
            log.warn("cluster[{}] session manager is failed", (Object)this.id, (Object)error);
        }

        @Override
        public Mono<Long> remove0(Service.SessionOperation operation) {
            return this.service.remove0(operation).onErrorResume(err -> {
                this.handleError((Throwable)err);
                return Reactors.ALWAYS_ZERO_LONG;
            });
        }

        @Override
        public Mono<Boolean> isAlive(String deviceId) {
            return this.service.isAlive(deviceId).onErrorResume(err -> {
                this.handleError((Throwable)err);
                return defaultAlive;
            });
        }

        @Override
        public Mono<Boolean> checkAlive(String deviceId) {
            return this.service.checkAlive(deviceId).onErrorResume(err -> {
                this.handleError((Throwable)err);
                return defaultAlive;
            });
        }

        @Override
        public Mono<Long> total() {
            return this.service.total().onErrorResume(err -> {
                this.handleError((Throwable)err);
                return Reactors.ALWAYS_ZERO_LONG;
            });
        }

        @Override
        public Mono<Boolean> init(String deviceId) {
            return this.service.init(deviceId).onErrorResume(err -> {
                this.handleError((Throwable)err);
                return Reactors.ALWAYS_FALSE;
            });
        }

        @Override
        public Mono<Long> remove(String deviceId) {
            return this.service.remove(deviceId).onErrorResume(err -> {
                this.handleError((Throwable)err);
                return Reactors.ALWAYS_ZERO_LONG;
            });
        }

        @Override
        public Flux<DeviceSessionInfo> sessions() {
            return this.service.sessions().onErrorResume(err -> {
                this.handleError((Throwable)err);
                return Mono.empty();
            });
        }

        @Override
        public Mono<Void> sync(DeviceSessionInfo info) {
            return this.service.sync(info);
        }

        @Generated
        public ErrorHandleService(String id, Service service) {
            this.id = id;
            this.service = service;
        }
    }
}

