/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.internal.networking.nonblocking;

import com.hazelcast.instance.HazelcastThreadGroup;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.networking.IOOutOfMemoryHandler;
import com.hazelcast.internal.networking.IOThreadingModel;
import com.hazelcast.internal.networking.SocketConnection;
import com.hazelcast.internal.networking.SocketReader;
import com.hazelcast.internal.networking.SocketReaderInitializer;
import com.hazelcast.internal.networking.SocketWriter;
import com.hazelcast.internal.networking.SocketWriterInitializer;
import com.hazelcast.internal.networking.nonblocking.MigratableHandler;
import com.hazelcast.internal.networking.nonblocking.NonBlockingIOThread;
import com.hazelcast.internal.networking.nonblocking.NonBlockingSocketReader;
import com.hazelcast.internal.networking.nonblocking.NonBlockingSocketWriter;
import com.hazelcast.internal.networking.nonblocking.SelectorMode;
import com.hazelcast.internal.networking.nonblocking.iobalancer.IOBalancer;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingService;
import com.hazelcast.util.HashUtil;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

public class NonBlockingIOThreadingModel
implements IOThreadingModel {
    private final NonBlockingIOThread[] inputThreads;
    private final NonBlockingIOThread[] outputThreads;
    private final AtomicInteger nextInputThreadIndex = new AtomicInteger();
    private final AtomicInteger nextOutputThreadIndex = new AtomicInteger();
    private final ILogger logger;
    private final MetricsRegistry metricsRegistry;
    private final LoggingService loggingService;
    private final HazelcastThreadGroup hazelcastThreadGroup;
    private final IOOutOfMemoryHandler oomeHandler;
    private final int balanceIntervalSeconds;
    private final SocketWriterInitializer socketWriterInitializer;
    private final SocketReaderInitializer socketReaderInitializer;
    private SelectorMode selectorMode;
    private volatile IOBalancer ioBalancer;
    private boolean selectorWorkaroundTest = Boolean.getBoolean("hazelcast.io.selector.workaround.test");

    public NonBlockingIOThreadingModel(LoggingService loggingService, MetricsRegistry metricsRegistry, HazelcastThreadGroup hazelcastThreadGroup, IOOutOfMemoryHandler oomeHandler, int inputThreadCount, int outputThreadCount, int balanceIntervalSeconds, SocketWriterInitializer socketWriterInitializer, SocketReaderInitializer socketReaderInitializer) {
        this.hazelcastThreadGroup = hazelcastThreadGroup;
        this.metricsRegistry = metricsRegistry;
        this.loggingService = loggingService;
        this.logger = loggingService.getLogger(NonBlockingIOThreadingModel.class);
        this.inputThreads = new NonBlockingIOThread[inputThreadCount];
        this.outputThreads = new NonBlockingIOThread[outputThreadCount];
        this.oomeHandler = oomeHandler;
        this.balanceIntervalSeconds = balanceIntervalSeconds;
        this.socketWriterInitializer = socketWriterInitializer;
        this.socketReaderInitializer = socketReaderInitializer;
    }

    private SelectorMode getSelectorMode() {
        if (this.selectorMode == null) {
            this.selectorMode = SelectorMode.getConfiguredValue();
        }
        return this.selectorMode;
    }

    public void setSelectorMode(SelectorMode mode) {
        this.selectorMode = mode;
    }

    void setSelectorWorkaroundTest(boolean selectorWorkaroundTest) {
        this.selectorWorkaroundTest = selectorWorkaroundTest;
    }

    @Override
    public boolean isBlocking() {
        return false;
    }

    @SuppressFBWarnings(value={"EI_EXPOSE_REP"}, justification="used only for testing")
    public NonBlockingIOThread[] getInputThreads() {
        return this.inputThreads;
    }

    @SuppressFBWarnings(value={"EI_EXPOSE_REP"}, justification="used only for testing")
    public NonBlockingIOThread[] getOutputThreads() {
        return this.outputThreads;
    }

    public IOBalancer getIOBalancer() {
        return this.ioBalancer;
    }

    @Override
    public void start() {
        NonBlockingIOThread thread;
        int i;
        if (this.logger.isFineEnabled()) {
            this.logger.fine("TcpIpConnectionManager configured with Non Blocking IO-threading model: " + this.inputThreads.length + " input threads and " + this.outputThreads.length + " output threads");
        }
        this.logger.log(this.getSelectorMode() != SelectorMode.SELECT ? Level.INFO : Level.FINE, "IO threads selector mode is " + (Object)((Object)this.getSelectorMode()));
        for (i = 0; i < this.inputThreads.length; ++i) {
            thread = new NonBlockingIOThread(this.hazelcastThreadGroup.getInternalThreadGroup(), this.hazelcastThreadGroup.getThreadPoolNamePrefix("IO") + "in-" + i, this.loggingService.getLogger(NonBlockingIOThread.class), this.oomeHandler, this.selectorMode);
            thread.id = i;
            thread.setSelectorWorkaroundTest(this.selectorWorkaroundTest);
            this.inputThreads[i] = thread;
            this.metricsRegistry.scanAndRegister(thread, "tcp.inputThread[" + thread.getName() + "]");
            thread.start();
        }
        for (i = 0; i < this.outputThreads.length; ++i) {
            thread = new NonBlockingIOThread(this.hazelcastThreadGroup.getInternalThreadGroup(), this.hazelcastThreadGroup.getThreadPoolNamePrefix("IO") + "out-" + i, this.loggingService.getLogger(NonBlockingIOThread.class), this.oomeHandler, this.selectorMode);
            thread.id = i;
            thread.setSelectorWorkaroundTest(this.selectorWorkaroundTest);
            this.outputThreads[i] = thread;
            this.metricsRegistry.scanAndRegister(thread, "tcp.outputThread[" + thread.getName() + "]");
            thread.start();
        }
        this.startIOBalancer();
    }

    @Override
    public void onConnectionAdded(SocketConnection connection) {
        MigratableHandler reader = (MigratableHandler)((Object)connection.getSocketReader());
        MigratableHandler writer = (MigratableHandler)((Object)connection.getSocketWriter());
        this.ioBalancer.connectionAdded(reader, writer);
    }

    @Override
    public void onConnectionRemoved(SocketConnection connection) {
        MigratableHandler reader = (MigratableHandler)((Object)connection.getSocketReader());
        MigratableHandler writer = (MigratableHandler)((Object)connection.getSocketWriter());
        this.ioBalancer.connectionRemoved(reader, writer);
    }

    private void startIOBalancer() {
        this.ioBalancer = new IOBalancer(this.inputThreads, this.outputThreads, this.hazelcastThreadGroup, this.balanceIntervalSeconds, this.loggingService);
        this.ioBalancer.start();
        this.metricsRegistry.scanAndRegister(this.ioBalancer, "tcp.balancer");
    }

    @Override
    public void shutdown() {
        this.ioBalancer.stop();
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Shutting down IO Threads... Total: " + (this.inputThreads.length + this.outputThreads.length));
        }
        this.shutdown(this.inputThreads);
        this.shutdown(this.outputThreads);
    }

    private void shutdown(NonBlockingIOThread[] threads) {
        for (int i = 0; i < threads.length; ++i) {
            NonBlockingIOThread ioThread = threads[i];
            if (ioThread != null) {
                ioThread.shutdown();
            }
            threads[i] = null;
        }
    }

    @Override
    public SocketWriter newSocketWriter(SocketConnection connection) {
        int index = HashUtil.hashToIndex(this.nextOutputThreadIndex.getAndIncrement(), this.outputThreads.length);
        NonBlockingIOThread outputThread = this.outputThreads[index];
        if (outputThread == null) {
            throw new IllegalStateException("IO thread is closed!");
        }
        return new NonBlockingSocketWriter(connection, outputThread, this.loggingService.getLogger(NonBlockingSocketWriter.class), this.ioBalancer, this.socketWriterInitializer);
    }

    @Override
    public SocketReader newSocketReader(SocketConnection connection) {
        int index = HashUtil.hashToIndex(this.nextInputThreadIndex.getAndIncrement(), this.inputThreads.length);
        NonBlockingIOThread inputThread = this.inputThreads[index];
        if (inputThread == null) {
            throw new IllegalStateException("IO thread is closed!");
        }
        return new NonBlockingSocketReader(connection, inputThread, this.loggingService.getLogger(NonBlockingSocketReader.class), this.ioBalancer, this.socketReaderInitializer);
    }
}

