/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.wire.fdx.bidirectional;

import io.vlingo.actors.Actor;
import io.vlingo.actors.ActorInstantiator;
import io.vlingo.actors.Definition;
import io.vlingo.actors.Stoppable;
import io.vlingo.common.Cancellable;
import io.vlingo.common.Completes;
import io.vlingo.common.Scheduled;
import io.vlingo.common.pool.ElasticResourcePool;
import io.vlingo.common.pool.ResourcePool;
import io.vlingo.wire.channel.RefreshableSelector;
import io.vlingo.wire.channel.RequestChannelConsumerProvider;
import io.vlingo.wire.channel.SocketChannelSelectionProcessor;
import io.vlingo.wire.channel.SocketChannelSelectionProcessorActor;
import io.vlingo.wire.fdx.bidirectional.ServerRequestResponseChannel;
import io.vlingo.wire.message.ConsumerByteBuffer;
import io.vlingo.wire.message.ConsumerByteBufferPool;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

@Deprecated
public class ServerRequestResponseChannelActor
extends Actor
implements ServerRequestResponseChannel,
Scheduled<Object> {
    private final Cancellable cancellable;
    private final ServerSocketChannel channel;
    private final String name;
    private final SocketChannelSelectionProcessor[] processors;
    private final int port;
    private int processorPoolIndex;
    private final long probeTimeout;
    private final ConsumerByteBufferPool requestBufferPool;
    private final RefreshableSelector selector;

    public ServerRequestResponseChannelActor(RequestChannelConsumerProvider provider, int port, String name, int processorPoolSize, int maxBufferPoolSize, int maxMessageSize, long probeInterval, long probeTimeout) {
        this.name = name;
        this.probeTimeout = probeTimeout;
        try {
            this.requestBufferPool = new ConsumerByteBufferPool(ElasticResourcePool.Config.of((int)maxBufferPoolSize), maxMessageSize);
            this.processors = this.startProcessors(provider, name, processorPoolSize, (ResourcePool<ConsumerByteBuffer, String>)this.requestBufferPool, probeInterval, probeTimeout);
            this.port = port;
            this.logger().info(this.getClass().getSimpleName() + ": OPENING PORT: " + this.port);
            this.channel = ServerSocketChannel.open();
            this.selector = RefreshableSelector.open(name);
            this.channel.socket().bind(new InetSocketAddress(port));
            this.channel.configureBlocking(false);
            this.selector.registerWith(this.channel, 16);
        }
        catch (Exception e) {
            String message = "Failure opening socket because: " + e.getMessage();
            this.logger().error(message, (Throwable)e);
            throw new IllegalArgumentException(message);
        }
        this.cancellable = this.stage().scheduler().schedule((Scheduled)this.selfAs(Scheduled.class), null, 100L, probeInterval);
    }

    @Override
    public void close() {
        if (this.isStopped()) {
            return;
        }
        ((Stoppable)this.selfAs(Stoppable.class)).stop();
    }

    @Override
    public Completes<Integer> port() {
        return this.completes().with((Object)this.port);
    }

    public void intervalSignal(Scheduled<Object> scheduled, Object data) {
        this.probeChannel();
    }

    public void stop() {
        this.cancellable.cancel();
        for (SocketChannelSelectionProcessor processor : this.processors) {
            processor.close();
        }
        try {
            this.selector.close();
        }
        catch (Exception e) {
            this.logger().error("Failed to close selector for: '" + this.name + "'", (Throwable)e);
        }
        try {
            this.channel.close();
        }
        catch (Exception e) {
            this.logger().error("Failed to close channel for: '" + this.name + "'", (Throwable)e);
        }
        super.stop();
    }

    private void probeChannel() {
        if (this.isStopped()) {
            return;
        }
        try {
            Iterator<SelectionKey> iterator = this.selector.select(this.probeTimeout);
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (!key.isValid() || !key.isAcceptable()) continue;
                this.accept(key);
            }
        }
        catch (Exception e) {
            this.logger().error(this.getClass().getSimpleName() + ": Failed to accept client channel for '" + this.name + "' because: " + e.getMessage(), (Throwable)e);
        }
    }

    private void accept(SelectionKey key) {
        ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
        try {
            SocketChannel clientChannel;
            if (serverChannel.isOpen() && (clientChannel = serverChannel.accept()) != null) {
                clientChannel.configureBlocking(false);
                this.pooledProcessor().process(clientChannel);
            }
        }
        catch (Exception e) {
            String message = this.getClass().getSimpleName() + ": Failed to accept client socket for " + this.name + " because: " + e.getMessage();
            this.logger().error(message, (Throwable)e);
            throw new IllegalArgumentException(message);
        }
    }

    private SocketChannelSelectionProcessor pooledProcessor() {
        if (this.processorPoolIndex >= this.processors.length) {
            this.processorPoolIndex = 0;
        }
        return this.processors[this.processorPoolIndex++];
    }

    private SocketChannelSelectionProcessor[] startProcessors(RequestChannelConsumerProvider provider, String name, int processorPoolSize, ResourcePool<ConsumerByteBuffer, String> requestBufferPool, long probeInterval, long probeTimeout) throws Exception {
        SocketChannelSelectionProcessor[] processors = new SocketChannelSelectionProcessor[processorPoolSize];
        try {
            for (int idx = 0; idx < processors.length; ++idx) {
                processors[idx] = (SocketChannelSelectionProcessor)this.childActorFor(SocketChannelSelectionProcessor.class, Definition.has(SocketChannelSelectionProcessorActor.class, (ActorInstantiator)new SocketChannelSelectionProcessor.SocketChannelSelectionProcessorInstantiator(provider, name + "-processor-" + idx, requestBufferPool, probeInterval, probeTimeout)));
            }
        }
        catch (Exception e) {
            this.logger().error(this.getClass().getSimpleName() + "FATAL: Socket channel processors cannot be started because: " + e.getMessage(), (Throwable)e);
            throw e;
        }
        return processors;
    }
}

