/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.network;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.failure.FailureType;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.worker.CriticalWorker;
import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
import org.apache.ignite.internal.worker.configuration.CriticalWorkersConfiguration;
import org.jetbrains.annotations.Nullable;

public class NettyWorkersRegistrar
implements IgniteComponent {
    private static final IgniteLogger LOG = Loggers.forClass(NettyWorkersRegistrar.class);
    private final CriticalWorkerRegistry criticalWorkerRegistry;
    private final ScheduledExecutorService scheduler;
    private final NettyBootstrapFactory bootstrapFactory;
    private final CriticalWorkersConfiguration criticalWorkersConfiguration;
    private final FailureManager failureManager;
    private volatile List<NettyWorker> workers;
    @Nullable
    private volatile ScheduledFuture<?> sendHearbeatsTaskFuture;

    public NettyWorkersRegistrar(CriticalWorkerRegistry criticalWorkerRegistry, ScheduledExecutorService scheduler, NettyBootstrapFactory bootstrapFactory, CriticalWorkersConfiguration criticalWorkersConfiguration, FailureManager failureManager) {
        this.criticalWorkerRegistry = criticalWorkerRegistry;
        this.scheduler = scheduler;
        this.bootstrapFactory = bootstrapFactory;
        this.criticalWorkersConfiguration = criticalWorkersConfiguration;
        this.failureManager = failureManager;
    }

    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        ArrayList<NettyWorker> nettyWorkers = new ArrayList<NettyWorker>();
        for (EventLoopGroup group : this.bootstrapFactory.eventLoopGroups()) {
            this.registerWorkersFor(group, nettyWorkers);
        }
        this.workers = List.copyOf(nettyWorkers);
        long heartbeatInterval = (Long)this.criticalWorkersConfiguration.nettyThreadsHeartbeatInterval().value();
        this.sendHearbeatsTaskFuture = this.scheduler.scheduleAtFixedRate(this::sendHearbeats, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
        return CompletableFutures.nullCompletedFuture();
    }

    private void registerWorkersFor(EventLoopGroup group, List<NettyWorker> nettyWorkers) {
        ArrayList<NettyWorker> groupWorkers = new ArrayList<NettyWorker>();
        for (EventExecutor eventExecutor : group) {
            SingleThreadEventExecutor executor = (SingleThreadEventExecutor)eventExecutor;
            groupWorkers.add(new NettyWorker(executor));
        }
        for (NettyWorker worker : groupWorkers) {
            this.criticalWorkerRegistry.register((CriticalWorker)worker);
        }
        nettyWorkers.addAll(groupWorkers);
    }

    private void sendHearbeats() {
        for (NettyWorker worker : this.workers) {
            try {
                worker.sendHeartbeat();
            }
            catch (AssertionError | Exception e) {
                LOG.warn("Cannot send a heartbeat to a Netty thread [threadId={}].", (Throwable)e, new Object[]{worker.threadId()});
            }
            catch (Error e) {
                LOG.error("Cannot send a heartbeat to a Netty thread, no more heartbeats will be sent [threadId={}].", (Throwable)e, new Object[]{worker.threadId()});
                this.failureManager.process(new FailureContext(FailureType.CRITICAL_ERROR, (Throwable)e));
                throw e;
            }
        }
    }

    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        List<NettyWorker> registeredWorkers;
        ScheduledFuture<?> heartBeatsTaskFuture = this.sendHearbeatsTaskFuture;
        if (heartBeatsTaskFuture != null) {
            heartBeatsTaskFuture.cancel(false);
        }
        if ((registeredWorkers = this.workers) != null) {
            for (NettyWorker worker : registeredWorkers) {
                this.criticalWorkerRegistry.unregister((CriticalWorker)worker);
            }
        }
        return CompletableFutures.nullCompletedFuture();
    }

    private static class NettyWorker
    implements CriticalWorker {
        private final SingleThreadEventExecutor eventExecutor;
        private volatile long heartbeatNanos = System.nanoTime();
        private final Runnable sendHeartbeatTask = () -> {
            this.heartbeatNanos = System.nanoTime();
        };

        private NettyWorker(SingleThreadEventExecutor eventExecutor) {
            this.eventExecutor = eventExecutor;
        }

        public long threadId() {
            return this.eventExecutor.threadProperties().id();
        }

        public long heartbeatNanos() {
            return this.heartbeatNanos;
        }

        private void sendHeartbeat() {
            this.eventExecutor.execute(this.sendHeartbeatTask);
        }
    }
}

