/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.cp.internal.session;

import com.hazelcast.cluster.Address;
import com.hazelcast.cp.CPGroupId;
import com.hazelcast.cp.internal.RaftNodeLifecycleAwareService;
import com.hazelcast.cp.internal.RaftService;
import com.hazelcast.cp.internal.TermChangeAwareService;
import com.hazelcast.cp.internal.datastructures.spi.AbstractCPMigrationAwareService;
import com.hazelcast.cp.internal.operation.unsafe.UnsafeRaftReplicateOp;
import com.hazelcast.cp.internal.raft.QueryPolicy;
import com.hazelcast.cp.internal.raft.SnapshotAwareService;
import com.hazelcast.cp.internal.raft.impl.RaftNode;
import com.hazelcast.cp.internal.session.CPSessionInfo;
import com.hazelcast.cp.internal.session.RaftSessionRegistry;
import com.hazelcast.cp.internal.session.SessionAccessor;
import com.hazelcast.cp.internal.session.SessionAwareService;
import com.hazelcast.cp.internal.session.SessionResponse;
import com.hazelcast.cp.internal.session.operation.CloseInactiveSessionsOp;
import com.hazelcast.cp.internal.session.operation.CloseSessionOp;
import com.hazelcast.cp.internal.session.operation.ExpireSessionsOp;
import com.hazelcast.cp.internal.session.operation.GetSessionsOp;
import com.hazelcast.cp.internal.util.PartitionSpecificRunnableAdaptor;
import com.hazelcast.cp.session.CPSession;
import com.hazelcast.cp.session.CPSessionManagementService;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.ProbeUnit;
import com.hazelcast.internal.services.ManagedService;
import com.hazelcast.internal.util.BiTuple;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.executor.ManagedExecutorService;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.InternalCompletableFuture;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.executionservice.ExecutionService;
import com.hazelcast.spi.impl.operationservice.impl.OperationServiceImpl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class RaftSessionService
extends AbstractCPMigrationAwareService
implements ManagedService,
SnapshotAwareService<RaftSessionRegistry>,
SessionAccessor,
TermChangeAwareService,
RaftNodeLifecycleAwareService,
CPSessionManagementService,
DynamicMetricsProvider {
    public static final String SERVICE_NAME = "hz:core:raftSession";
    private static final long CHECK_EXPIRED_SESSIONS_TASK_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(1L);
    private static final long CHECK_INACTIVE_SESSIONS_TASK_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(30L);
    private static final long COLLECT_INACTIVE_SESSIONS_TASK_TIMEOUT_SECONDS = 5L;
    private final ILogger logger;
    private volatile RaftService raftService;
    private final Map<CPGroupId, RaftSessionRegistry> registries = new ConcurrentHashMap<CPGroupId, RaftSessionRegistry>();

    public RaftSessionService(NodeEngine nodeEngine) {
        super(nodeEngine);
        this.logger = nodeEngine.getLogger(this.getClass());
    }

    @Override
    public void init(NodeEngine nodeEngine, Properties properties) {
        this.raftService = (RaftService)nodeEngine.getService("hz:core:raft");
        for (SessionAwareService service : nodeEngine.getServices(SessionAwareService.class)) {
            service.setSessionAccessor(this);
        }
        ExecutionService executionService = nodeEngine.getExecutionService();
        executionService.scheduleWithRepetition(new CheckSessionsToExpire(), CHECK_EXPIRED_SESSIONS_TASK_PERIOD_IN_MILLIS, CHECK_EXPIRED_SESSIONS_TASK_PERIOD_IN_MILLIS, TimeUnit.MILLISECONDS);
        executionService.scheduleWithRepetition(new CheckInactiveSessions(), CHECK_INACTIVE_SESSIONS_TASK_PERIOD_IN_MILLIS, CHECK_INACTIVE_SESSIONS_TASK_PERIOD_IN_MILLIS, TimeUnit.MILLISECONDS);
        this.nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(this);
    }

    @Override
    public void reset() {
    }

    @Override
    public void shutdown(boolean terminate) {
        this.registries.clear();
    }

    @Override
    public RaftSessionRegistry takeSnapshot(CPGroupId groupId, long commitIndex) {
        RaftSessionRegistry registry = this.registries.get(groupId);
        return registry != null ? registry.cloneForSnapshot() : null;
    }

    @Override
    public void restoreSnapshot(CPGroupId groupId, long commitIndex, RaftSessionRegistry registry) {
        if (registry != null) {
            this.registries.put(groupId, registry);
        }
    }

    @Override
    public void onNewTermCommit(CPGroupId groupId, long commitIndex) {
        RaftSessionRegistry registry = this.registries.get(groupId);
        if (registry != null) {
            registry.shiftExpirationTimes(this.getHeartbeatIntervalMillis());
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Session expiration times are shifted in " + groupId);
            }
        }
    }

    @Override
    public void onRaftNodeTerminated(CPGroupId groupId) {
        this.registries.remove(groupId);
    }

    @Override
    public void onRaftNodeSteppedDown(CPGroupId groupId) {
    }

    public InternalCompletableFuture<Collection<CPSession>> getAllSessions(String groupName) {
        Preconditions.checkTrue(!"METADATA".equals(groupName), "Cannot query CP sessions on the METADATA CP group!");
        ManagedExecutorService executor = this.nodeEngine.getExecutionService().getExecutor("hz:system");
        InternalCompletableFuture<Collection<CPSession>> future = InternalCompletableFuture.withExecutor(executor);
        this.raftService.getCPGroup(groupName).whenCompleteAsync((group, t) -> {
            if (t == null) {
                if (group != null) {
                    this.getAllSessions(group.id()).whenCompleteAsync(InternalCompletableFuture.completingCallback(future));
                } else {
                    future.completeExceptionally(new IllegalArgumentException());
                }
            } else {
                future.completeExceptionally((Throwable)t);
            }
        });
        return future;
    }

    public InternalCompletableFuture<Collection<CPSession>> getAllSessions(CPGroupId groupId) {
        Preconditions.checkTrue(!"METADATA".equals(groupId.getName()), "Cannot query CP sessions on the METADATA CP group!");
        return this.raftService.getInvocationManager().query(groupId, new GetSessionsOp(), QueryPolicy.LINEARIZABLE);
    }

    public InternalCompletableFuture<Boolean> forceCloseSession(String groupName, long sessionId) {
        ManagedExecutorService executor = this.nodeEngine.getExecutionService().getExecutor("hz:system");
        InternalCompletableFuture<Boolean> future = InternalCompletableFuture.withExecutor(executor);
        this.raftService.getCPGroup(groupName).whenCompleteAsync((group, t) -> {
            if (t == null) {
                if (group != null) {
                    this.raftService.getInvocationManager().invoke(group.id(), new CloseSessionOp(sessionId)).whenCompleteAsync(InternalCompletableFuture.completingCallback(future));
                } else {
                    future.complete(false);
                }
            } else {
                future.completeExceptionally((Throwable)t);
            }
        });
        return future;
    }

    public SessionResponse createNewSession(CPGroupId groupId, Address endpoint, String endpointName, CPSession.CPSessionOwnerType endpointType) {
        RaftSessionRegistry registry = this.getOrInitRegistry(groupId);
        long creationTime = Clock.currentTimeMillis();
        long sessionTTLMillis = this.getSessionTTLMillis();
        long sessionId = registry.createNewSession(sessionTTLMillis, endpoint, endpointName, endpointType, creationTime);
        this.logger.info("Created new session: " + sessionId + " in " + groupId + " for " + (Object)((Object)endpointType) + " -> " + endpoint);
        return new SessionResponse(sessionId, sessionTTLMillis, this.getHeartbeatIntervalMillis());
    }

    private RaftSessionRegistry getOrInitRegistry(CPGroupId groupId) {
        RaftSessionRegistry registry = this.registries.get(groupId);
        if (registry == null) {
            registry = new RaftSessionRegistry(groupId);
            this.registries.put(groupId, registry);
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Created new session registry for " + groupId);
            }
        }
        return registry;
    }

    @Override
    public void heartbeat(CPGroupId groupId, long sessionId) {
        RaftSessionRegistry registry = this.registries.get(groupId);
        if (registry == null) {
            throw new IllegalStateException("No session: " + sessionId + " for CP group: " + groupId);
        }
        registry.heartbeat(sessionId, this.getSessionTTLMillis());
        if (this.logger.isFineEnabled()) {
            this.logger.fine("Session: " + sessionId + " heartbeat in " + groupId);
        }
    }

    public boolean closeSession(CPGroupId groupId, long sessionId) {
        RaftSessionRegistry registry = this.registries.get(groupId);
        if (registry == null) {
            return false;
        }
        if (registry.closeSession(sessionId)) {
            this.logger.info("Session: " + sessionId + " is closed in " + groupId);
            this.notifyServices(groupId, Collections.singleton(sessionId));
            return true;
        }
        return false;
    }

    public void expireSessions(CPGroupId groupId, Collection<BiTuple<Long, Long>> sessionsToExpire) {
        RaftSessionRegistry registry = this.registries.get(groupId);
        if (registry == null) {
            return;
        }
        ArrayList<Long> expired = new ArrayList<Long>();
        for (BiTuple<Long, Long> s : sessionsToExpire) {
            long version;
            long sessionId = (Long)s.element1;
            if (!registry.expireSession(sessionId, version = ((Long)s.element2).longValue())) continue;
            expired.add(sessionId);
        }
        if (expired.size() > 0) {
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Sessions: " + expired + " are expired in " + groupId);
            }
            this.notifyServices(groupId, expired);
        }
    }

    public void closeInactiveSessions(CPGroupId groupId, Collection<Long> inactiveSessions) {
        RaftSessionRegistry registry = this.registries.get(groupId);
        if (registry == null) {
            return;
        }
        HashSet<Long> closed = new HashSet<Long>(inactiveSessions);
        for (SessionAwareService service : this.nodeEngine.getServices(SessionAwareService.class)) {
            closed.removeAll(service.getAttachedSessions(groupId));
        }
        Iterator<SessionAwareService> iterator = closed.iterator();
        while (iterator.hasNext()) {
            long sessionId = (Long)((Object)iterator.next());
            registry.closeSession(sessionId);
        }
        if (closed.size() > 0) {
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Inactive sessions: " + closed + " are closed in " + groupId);
            }
            this.notifyServices(groupId, closed);
        }
    }

    public long generateThreadId(CPGroupId groupId) {
        return this.getOrInitRegistry(groupId).generateThreadId();
    }

    public Collection<CPSession> getSessionsLocally(CPGroupId groupId) {
        RaftSessionRegistry registry = this.getSessionRegistryOrNull(groupId);
        if (registry == null) {
            return Collections.emptyList();
        }
        return Collections.unmodifiableCollection(registry.getSessions());
    }

    RaftSessionRegistry getSessionRegistryOrNull(CPGroupId groupId) {
        return this.registries.get(groupId);
    }

    private long getHeartbeatIntervalMillis() {
        return TimeUnit.SECONDS.toMillis(this.raftService.getConfig().getSessionHeartbeatIntervalSeconds());
    }

    private long getSessionTTLMillis() {
        return TimeUnit.SECONDS.toMillis(this.raftService.getConfig().getSessionTimeToLiveSeconds());
    }

    private void notifyServices(CPGroupId groupId, Collection<Long> sessionIds) {
        Collection<SessionAwareService> services = this.nodeEngine.getServices(SessionAwareService.class);
        for (SessionAwareService sessionAwareService : services) {
            for (long sessionId : sessionIds) {
                sessionAwareService.onSessionClose(groupId, sessionId);
            }
        }
    }

    @Override
    public boolean isActive(CPGroupId groupId, long sessionId) {
        RaftSessionRegistry sessionRegistry = this.registries.get(groupId);
        if (sessionRegistry == null) {
            return false;
        }
        CPSessionInfo session = sessionRegistry.getSession(sessionId);
        return session != null;
    }

    private Map<CPGroupId, Collection<BiTuple<Long, Long>>> getSessionsToExpire() {
        HashMap<CPGroupId, Collection<BiTuple<Long, Long>>> expired = new HashMap<CPGroupId, Collection<BiTuple<Long, Long>>>();
        for (RaftSessionRegistry registry : this.registries.values()) {
            Collection<BiTuple<Long, Long>> e = registry.getSessionsToExpire();
            if (e.isEmpty()) continue;
            expired.put(registry.groupId(), e);
        }
        return expired;
    }

    private Map<CPGroupId, Collection<Long>> getInactiveSessions() {
        ConcurrentHashMap<CPGroupId, Collection<Long>> response = new ConcurrentHashMap<CPGroupId, Collection<Long>>();
        Semaphore semaphore = new Semaphore(0);
        OperationServiceImpl operationService = this.nodeEngine.getOperationService();
        ArrayList<RaftSessionRegistry> registries = new ArrayList<RaftSessionRegistry>(this.registries.values());
        for (RaftSessionRegistry registry : registries) {
            CPGroupId groupId = registry.groupId();
            operationService.execute(new PartitionSpecificRunnableAdaptor(() -> {
                HashSet<Long> activeSessionIds = new HashSet<Long>();
                for (SessionAwareService service : this.nodeEngine.getServices(SessionAwareService.class)) {
                    activeSessionIds.addAll(service.getAttachedSessions(groupId));
                }
                HashSet<Long> inactiveSessionIds = new HashSet<Long>();
                for (CPSession cPSession : registry.getSessions()) {
                    if (activeSessionIds.contains(cPSession.id()) || cPSession.creationTime() + this.getSessionTTLMillis() >= Clock.currentTimeMillis()) continue;
                    inactiveSessionIds.add(cPSession.id());
                }
                if (inactiveSessionIds.size() > 0) {
                    response.put(groupId, inactiveSessionIds);
                }
                semaphore.release();
            }, this.nodeEngine.getPartitionService().getPartitionId(groupId)));
        }
        try {
            semaphore.tryAcquire(registries.size(), 5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return response;
    }

    @Override
    protected int getBackupCount() {
        return 1;
    }

    @Override
    protected Map<CPGroupId, Object> getSnapshotMap(int partitionId) {
        int partitionCount = this.nodeEngine.getPartitionService().getPartitionCount();
        return this.registries.keySet().stream().filter(groupId -> RaftService.getCPGroupPartitionId(groupId, partitionCount) == partitionId).distinct().map(groupId -> BiTuple.of(groupId, this.takeSnapshot((CPGroupId)groupId, 0L))).collect(Collectors.toMap(tuple -> (CPGroupId)tuple.element1, tuple -> (RaftSessionRegistry)tuple.element2));
    }

    @Override
    protected void clearPartitionReplica(int partitionId) {
        this.registries.keySet().removeIf(groupId -> this.raftService.getCPGroupPartitionId((CPGroupId)groupId) == partitionId);
    }

    @Override
    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        MetricDescriptor root = descriptor.withPrefix("cp.session");
        for (RaftSessionRegistry registry : this.registries.values()) {
            CPGroupId groupId = registry.groupId();
            for (CPSession cPSession : registry.getSessions()) {
                MetricDescriptor desc = root.copy().withDiscriminator("id", cPSession.id() + "@" + groupId.getName()).withTag("sessionId", String.valueOf(cPSession.id())).withTag("group", groupId.getName());
                context.collect(desc.copy().withTag("endpoint", cPSession.endpoint().toString()).withMetric("endpoint"), 0L);
                context.collect(desc.copy().withTag("endpointType", cPSession.endpointType().toString()).withMetric("endpointType"), 0L);
                context.collect(desc.copy().withMetric("version"), cPSession.version());
                context.collect(desc.copy().withUnit(ProbeUnit.MS).withMetric("creationTime"), cPSession.creationTime());
                context.collect(desc.copy().withUnit(ProbeUnit.MS).withMetric("expirationTime"), cPSession.expirationTime());
            }
        }
    }

    private class CheckInactiveSessions
    implements Runnable {
        private CheckInactiveSessions() {
        }

        @Override
        public void run() {
            Map inactiveSessions = RaftSessionService.this.getInactiveSessions();
            for (Map.Entry entry : inactiveSessions.entrySet()) {
                CPGroupId groupId = (CPGroupId)entry.getKey();
                Collection sessions = (Collection)entry.getValue();
                if (RaftSessionService.this.raftService.isCpSubsystemEnabled()) {
                    this.closeOnRaft(groupId, sessions);
                    continue;
                }
                this.closeOnPartitionOwner(groupId, sessions);
            }
        }

        private void closeOnRaft(CPGroupId groupId, Collection<Long> sessions) {
            block3: {
                RaftNode raftNode = RaftSessionService.this.raftService.getRaftNode(groupId);
                if (raftNode != null) {
                    try {
                        InternalCompletableFuture f = raftNode.replicate(new CloseInactiveSessionsOp(sessions));
                        f.get();
                    }
                    catch (Exception e) {
                        if (!RaftSessionService.this.logger.isFineEnabled()) break block3;
                        RaftSessionService.this.logger.fine("Could not close inactive sessions: " + sessions + " of " + groupId, e);
                    }
                }
            }
        }

        private void closeOnPartitionOwner(CPGroupId groupId, Collection<Long> sessions) {
            block2: {
                InternalCompletableFuture future = RaftSessionService.this.raftService.getInvocationManager().invokeOnPartition(new UnsafeRaftReplicateOp(groupId, new CloseInactiveSessionsOp(sessions)));
                try {
                    future.join();
                }
                catch (Exception e) {
                    if (!RaftSessionService.this.logger.isFineEnabled()) break block2;
                    RaftSessionService.this.logger.fine("Could not close inactive sessions: " + sessions + " of " + groupId, e);
                }
            }
        }
    }

    private class CheckSessionsToExpire
    implements Runnable {
        private CheckSessionsToExpire() {
        }

        @Override
        public void run() {
            Map sessionsToExpire = RaftSessionService.this.getSessionsToExpire();
            for (Map.Entry entry : sessionsToExpire.entrySet()) {
                CPGroupId groupId = (CPGroupId)entry.getKey();
                Collection sessions = (Collection)entry.getValue();
                if (RaftSessionService.this.raftService.isCpSubsystemEnabled()) {
                    this.expireOnRaftNode(groupId, sessions);
                    continue;
                }
                this.expireOnPartitionOwner(groupId, sessions);
            }
        }

        private void expireOnRaftNode(CPGroupId groupId, Collection<BiTuple<Long, Long>> sessions) {
            block3: {
                RaftNode raftNode = RaftSessionService.this.raftService.getRaftNode(groupId);
                if (raftNode != null) {
                    try {
                        InternalCompletableFuture f = raftNode.replicate(new ExpireSessionsOp(sessions));
                        f.get();
                    }
                    catch (Exception e) {
                        if (!RaftSessionService.this.logger.isFineEnabled()) break block3;
                        RaftSessionService.this.logger.fine("Could not invalidate sessions: " + sessions + " of " + groupId, e);
                    }
                }
            }
        }

        private void expireOnPartitionOwner(CPGroupId groupId, Collection<BiTuple<Long, Long>> sessions) {
            block2: {
                InternalCompletableFuture future = RaftSessionService.this.raftService.getInvocationManager().invokeOnPartition(new UnsafeRaftReplicateOp(groupId, new ExpireSessionsOp(sessions)));
                try {
                    future.join();
                }
                catch (Exception e) {
                    if (!RaftSessionService.this.logger.isFineEnabled()) break block2;
                    RaftSessionService.this.logger.fine("Could not invalidate sessions: " + sessions + " of " + groupId, e);
                }
            }
        }
    }
}

