/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.wire.channel;

import io.vlingo.actors.Actor;
import io.vlingo.actors.Stoppable;
import io.vlingo.common.Cancellable;
import io.vlingo.common.Scheduled;
import io.vlingo.common.pool.ResourcePool;
import io.vlingo.wire.channel.RefreshableSelector;
import io.vlingo.wire.channel.RequestChannelConsumer;
import io.vlingo.wire.channel.RequestChannelConsumerProvider;
import io.vlingo.wire.channel.RequestResponseContext;
import io.vlingo.wire.channel.ResponseSenderChannel;
import io.vlingo.wire.channel.SocketChannelSelectionProcessor;
import io.vlingo.wire.message.BasicConsumerByteBuffer;
import io.vlingo.wire.message.ConsumerByteBuffer;
import io.vlingo.wire.message.Converters;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;

public class SocketChannelSelectionProcessorActor
extends Actor
implements SocketChannelSelectionProcessor,
ResponseSenderChannel,
Scheduled<Object>,
Stoppable {
    private final Cancellable cancellable;
    private int contextId;
    private final String name;
    private final long probeTimeout;
    private final RequestChannelConsumerProvider provider;
    private final ResourcePool<ConsumerByteBuffer, String> requestBufferPool;
    private final ResponseSenderChannel responder;
    private final RefreshableSelector selector;
    private final LinkedList<Context> writableContexts;

    public SocketChannelSelectionProcessorActor(RequestChannelConsumerProvider provider, String name, ResourcePool<ConsumerByteBuffer, String> requestBufferPool, long probeInterval, long probeTimeout) {
        this.logger().debug("Probe interval: " + probeInterval + " Probe timeout: " + probeTimeout);
        this.provider = provider;
        this.name = name;
        this.requestBufferPool = requestBufferPool;
        this.probeTimeout = probeTimeout;
        this.selector = RefreshableSelector.open(name);
        this.responder = (ResponseSenderChannel)this.selfAs(ResponseSenderChannel.class);
        this.writableContexts = new LinkedList();
        this.cancellable = this.stage().scheduler().schedule((Scheduled)this.selfAs(Scheduled.class), null, 100L, probeInterval);
    }

    @Override
    public void abandon(RequestResponseContext<?> context) {
        ((Context)context).close();
    }

    @Override
    public void close() {
        if (this.isStopped()) {
            return;
        }
        ((Stoppable)this.selfAs(Stoppable.class)).stop();
    }

    @Override
    public void respondWith(RequestResponseContext<?> context, ConsumerByteBuffer buffer) {
        this.respondWith(context, buffer, false);
    }

    @Override
    public void respondWith(RequestResponseContext<?> context, ConsumerByteBuffer buffer, boolean closeFollowing) {
        Context internalContext = (Context)context;
        internalContext.queueWritable(buffer);
        internalContext.requireExplicitClose(!closeFollowing);
    }

    @Override
    public void respondWith(RequestResponseContext<?> context, Object response, boolean closeFollowing) {
        String textResponse = response.toString();
        ConsumerByteBuffer buffer = new BasicConsumerByteBuffer(0, textResponse.length() + 1024).put(Converters.textToBytes(textResponse)).flip();
        this.respondWith(context, buffer, closeFollowing);
    }

    @Override
    public void process(SocketChannel clientChannel) {
        try {
            this.selector.registerWith(clientChannel, 1, new Context(clientChannel));
        }
        catch (Exception e) {
            String message = "Failed to accept client socket for " + this.name + " because: " + e.getMessage();
            this.logger().error(message, (Throwable)e);
            throw new IllegalArgumentException(message);
        }
    }

    public void intervalSignal(Scheduled<Object> scheduled, Object data) {
        this.probeChannel();
    }

    public void stop() {
        this.cancellable.cancel();
        try {
            this.selector.close();
        }
        catch (Exception e) {
            this.logger().error("Failed to close selector for " + this.name + " while stopping because: " + e.getMessage(), (Throwable)e);
        }
    }

    private void closeForPeerDisconnect(Context context, SelectionKey key) {
        try {
            context.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            key.cancel();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private void probeChannel() {
        if (this.isStopped()) {
            return;
        }
        try {
            Iterator<SelectionKey> iterator = this.selector.select(this.probeTimeout);
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (!key.isValid()) continue;
                if (key.isReadable()) {
                    this.read(key);
                    continue;
                }
                if (!key.isWritable()) continue;
                this.write(key);
            }
            while (!this.writableContexts.isEmpty()) {
                this.write(this.writableContexts.poll());
            }
        }
        catch (ClosedSelectorException e) {
            this.logger().error("Failed client channel processing for " + this.name + " because selector is closed.");
        }
        catch (Exception e) {
            this.logger().error("Failed client channel processing for " + this.name + " because: " + e.getMessage(), (Throwable)e);
        }
    }

    private void read(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel)key.channel();
        if (!channel.isOpen()) {
            key.cancel();
            return;
        }
        Context context = (Context)key.attachment();
        ConsumerByteBuffer buffer = context.requestBuffer().clear();
        ByteBuffer readBuffer = buffer.asByteBuffer();
        int totalBytesRead = 0;
        int bytesRead = 0;
        try {
            do {
                bytesRead = channel.read(readBuffer);
                totalBytesRead += bytesRead;
            } while (bytesRead > 0);
        }
        catch (Exception e) {
            bytesRead = -1;
        }
        if (bytesRead == -1) {
            this.closeForPeerDisconnect(context, key);
        }
        if (totalBytesRead > 0) {
            context.consumer().consume(context, buffer.flip());
        } else {
            context.close();
        }
    }

    private void write(SelectionKey key) throws Exception {
        this.write((Context)key.attachment());
    }

    private void write(Context context) throws Exception {
        if (context.isChannelClosed()) {
            context.close();
            return;
        }
        if (!context.writeMode && context.hasNextWritable()) {
            this.writeWithCachedData(context, context.clientChannel);
        }
        context.eagerClose();
    }

    private void writeWithCachedData(Context context, SocketChannel channel) throws Exception {
        ConsumerByteBuffer buffer = context.nextWritable();
        while (buffer != null) {
            this.writeWithCachedData(context, channel, buffer);
            buffer = context.nextWritable();
        }
    }

    private void writeWithCachedData(Context context, SocketChannel clientChannel, ConsumerByteBuffer buffer) throws Exception {
        try {
            ByteBuffer responseBuffer = buffer.asByteBuffer();
            while (responseBuffer.hasRemaining()) {
                if (clientChannel.write(responseBuffer) >= 1) continue;
                context.setWriteMode(true);
                return;
            }
            context.confirmCurrentWritable(buffer);
        }
        catch (Exception e) {
            this.logger().error("Failed to write buffer for " + this.name + " with channel " + clientChannel.getRemoteAddress() + " because: " + e.getMessage(), (Throwable)e);
        }
    }

    private class Context
    implements RequestResponseContext<SocketChannel> {
        private final SocketChannel clientChannel;
        private Object closingData;
        private final RequestChannelConsumer consumer;
        private Object consumerData;
        private final String id;
        private boolean requireExplicitClose;
        private final Queue<ConsumerByteBuffer> writables;
        private boolean writeMode;

        @Override
        public <T> T consumerData() {
            return (T)this.consumerData;
        }

        @Override
        public <T> T consumerData(T workingData) {
            this.consumerData = workingData;
            return workingData;
        }

        @Override
        public boolean hasConsumerData() {
            return this.consumerData != null;
        }

        @Override
        public String id() {
            return this.id;
        }

        @Override
        public ResponseSenderChannel sender() {
            return SocketChannelSelectionProcessorActor.this.responder;
        }

        @Override
        public void whenClosing(Object data) {
            this.closingData = data;
        }

        Context(SocketChannel clientChannel) {
            this.clientChannel = clientChannel;
            this.consumer = SocketChannelSelectionProcessorActor.this.provider.requestChannelConsumer();
            this.id = "" + ++SocketChannelSelectionProcessorActor.this.contextId;
            this.requireExplicitClose = true;
            this.writables = new LinkedList<ConsumerByteBuffer>();
            this.writeMode = false;
        }

        boolean isChannelClosed() {
            return !this.clientChannel.isOpen();
        }

        boolean isChannelOpen() {
            return this.clientChannel.isOpen();
        }

        void close() {
            block2: {
                try {
                    this.consumer().closeWith(this, this.closingData);
                    this.whenClosing(null);
                    SocketChannelSelectionProcessorActor.this.selector.keyFor(this.clientChannel).cancel();
                    this.clientChannel.close();
                }
                catch (Exception e) {
                    if (!this.hasNextWritable()) break block2;
                    SocketChannelSelectionProcessorActor.this.logger().info("Client channel didn't close normally and still has writable data.");
                }
            }
        }

        void eagerClose() {
            if (this.requireExplicitClose) {
                return;
            }
            if (this.isChannelOpen()) {
                this.close();
            }
        }

        RequestChannelConsumer consumer() {
            return this.consumer;
        }

        void confirmCurrentWritable(ConsumerByteBuffer buffer) {
            try {
                buffer.release();
            }
            catch (Exception exception) {
                // empty catch block
            }
            try {
                this.setWriteMode(false);
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.writables.poll();
        }

        boolean hasNextWritable() {
            return this.writables.peek() != null;
        }

        void requireExplicitClose(boolean option) {
            this.requireExplicitClose = option;
        }

        ConsumerByteBuffer nextWritable() {
            return this.writables.peek();
        }

        void queueWritable(ConsumerByteBuffer buffer) {
            this.writables.add(buffer);
            if (!this.writeMode) {
                SocketChannelSelectionProcessorActor.this.writableContexts.add(this);
            }
        }

        ConsumerByteBuffer requestBuffer() {
            return (ConsumerByteBuffer)SocketChannelSelectionProcessorActor.this.requestBufferPool.acquire((Object)"SocketChannelSelectionProcessorActor#Context");
        }

        void setWriteMode(boolean on) throws ClosedChannelException {
            int options = 1 | (on ? 4 : 0);
            SocketChannelSelectionProcessorActor.this.selector.registerWith(this.clientChannel, options, this);
            this.writeMode = on;
        }
    }
}

