/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.nio.tcp.nonblocking;

import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.nio.IOUtil;
import com.hazelcast.nio.OutboundFrame;
import com.hazelcast.nio.Packet;
import com.hazelcast.nio.ascii.TextWriteHandler;
import com.hazelcast.nio.tcp.ClientWriteHandler;
import com.hazelcast.nio.tcp.SocketWriter;
import com.hazelcast.nio.tcp.TcpIpConnection;
import com.hazelcast.nio.tcp.WriteHandler;
import com.hazelcast.nio.tcp.nonblocking.AbstractHandler;
import com.hazelcast.nio.tcp.nonblocking.NonBlockingIOThread;
import com.hazelcast.util.EmptyStatement;
import com.hazelcast.util.StringUtil;
import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;

public final class NonBlockingSocketWriter
extends AbstractHandler
implements Runnable,
SocketWriter {
    private static final long TIMEOUT = 3L;
    @Probe(name="writeQueueSize")
    public final Queue<OutboundFrame> writeQueue = new ConcurrentLinkedQueue<OutboundFrame>();
    @Probe(name="priorityWriteQueueSize")
    public final Queue<OutboundFrame> urgentWriteQueue = new ConcurrentLinkedQueue<OutboundFrame>();
    @Probe(name="eventCount")
    private final SwCounter eventCount = SwCounter.newSwCounter();
    private final AtomicBoolean scheduled = new AtomicBoolean(false);
    private ByteBuffer outputBuffer;
    @Probe(name="bytesWritten")
    private final SwCounter bytesWritten = SwCounter.newSwCounter();
    @Probe(name="normalFramesWritten")
    private final SwCounter normalFramesWritten = SwCounter.newSwCounter();
    @Probe(name="priorityFramesWritten")
    private final SwCounter priorityFramesWritten = SwCounter.newSwCounter();
    private final MetricsRegistry metricsRegistry;
    private volatile OutboundFrame currentFrame;
    private WriteHandler writeHandler;
    private volatile long lastWriteTime;
    private boolean shutdown;
    private NonBlockingIOThread newOwner;

    NonBlockingSocketWriter(TcpIpConnection connection, NonBlockingIOThread ioThread, MetricsRegistry metricsRegistry) {
        super(connection, ioThread, 4);
        this.metricsRegistry = metricsRegistry;
        metricsRegistry.scanAndRegister(this, "tcp.connection[" + connection.getMetricsId() + "].out");
    }

    @Override
    public int totalFramesPending() {
        return this.writeQueue.size() + this.urgentWriteQueue.size();
    }

    @Override
    public long getLastWriteTimeMillis() {
        return this.lastWriteTime;
    }

    @Override
    public WriteHandler getWriteHandler() {
        return this.writeHandler;
    }

    @Probe(name="writeQueuePendingBytes", level=ProbeLevel.DEBUG)
    public long bytesPending() {
        return this.bytesPending(this.writeQueue);
    }

    @Probe(name="priorityWriteQueuePendingBytes", level=ProbeLevel.DEBUG)
    public long priorityBytesPending() {
        return this.bytesPending(this.urgentWriteQueue);
    }

    private long bytesPending(Queue<OutboundFrame> writeQueue) {
        long bytesPending = 0L;
        for (OutboundFrame frame : writeQueue) {
            if (!(frame instanceof Packet)) continue;
            bytesPending += (long)((Packet)frame).packetSize();
        }
        return bytesPending;
    }

    @Probe(name="idleTimeMs", level=ProbeLevel.DEBUG)
    private long idleTimeMs() {
        return Math.max(System.currentTimeMillis() - this.lastWriteTime, 0L);
    }

    @Probe(name="isScheduled", level=ProbeLevel.DEBUG)
    private long isScheduled() {
        return this.scheduled.get() ? 1L : 0L;
    }

    @Override
    public void setProtocol(final String protocol) {
        final CountDownLatch latch = new CountDownLatch(1);
        this.ioThread.addTaskAndWakeup(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    NonBlockingSocketWriter.this.createWriterHandler(protocol);
                }
                catch (Throwable t) {
                    NonBlockingSocketWriter.this.onFailure(t);
                }
                finally {
                    latch.countDown();
                }
            }
        });
        try {
            latch.await(3L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            this.logger.finest("CountDownLatch::await interrupted", e);
        }
    }

    private void createWriterHandler(String protocol) throws IOException {
        if (this.writeHandler == null) {
            if ("HZC".equals(protocol)) {
                this.configureBuffers(this.ioService.getSocketSendBufferSize() * 1024);
                this.writeHandler = this.ioService.createWriteHandler(this.connection);
                this.outputBuffer.put(StringUtil.stringToBytes("HZC"));
                this.registerOp(4);
            } else if ("CB2".equals(protocol)) {
                this.configureBuffers(this.ioService.getSocketClientReceiveBufferSize() * 1024);
                this.writeHandler = new ClientWriteHandler();
            } else {
                this.configureBuffers(this.ioService.getSocketClientSendBufferSize() * 1024);
                this.writeHandler = new TextWriteHandler(this.connection);
            }
        }
    }

    private void configureBuffers(int size) {
        this.outputBuffer = IOUtil.newByteBuffer(size, this.ioService.isSocketBufferDirect());
        try {
            this.connection.setSendBufferSize(size);
        }
        catch (SocketException e) {
            this.logger.finest("Failed to adjust TCP send buffer of " + this.connection + " to " + size + " B.", e);
        }
    }

    @Override
    public void write(OutboundFrame frame) {
        if (frame.isUrgent()) {
            this.urgentWriteQueue.offer(frame);
        } else {
            this.writeQueue.offer(frame);
        }
        this.schedule();
    }

    private OutboundFrame poll() {
        OutboundFrame frame;
        boolean urgent;
        while (true) {
            urgent = true;
            frame = this.urgentWriteQueue.poll();
            if (frame == null) {
                urgent = false;
                frame = this.writeQueue.poll();
            }
            if (frame == null) {
                return null;
            }
            if (frame.getClass() != TaskFrame.class) break;
            TaskFrame taskFrame = (TaskFrame)frame;
            taskFrame.task.run();
        }
        if (urgent) {
            this.priorityFramesWritten.inc();
        } else {
            this.normalFramesWritten.inc();
        }
        return frame;
    }

    private void schedule() {
        if (this.scheduled.get()) {
            return;
        }
        if (!this.scheduled.compareAndSet(false, true)) {
            return;
        }
        this.ioThread.addTaskAndWakeup(this);
    }

    private void unschedule() throws IOException {
        if (this.dirtyOutputBuffer() || this.currentFrame != null) {
            this.registerOp(4);
            return;
        }
        this.unregisterOp(4);
        this.scheduled.set(false);
        if (this.writeQueue.isEmpty() && this.urgentWriteQueue.isEmpty()) {
            return;
        }
        if (!this.scheduled.compareAndSet(false, true)) {
            return;
        }
        this.ioThread.addTask(this);
    }

    @Override
    public long getEventCount() {
        return this.eventCount.get();
    }

    @Override
    public void handle() throws Exception {
        this.eventCount.inc();
        this.lastWriteTime = System.currentTimeMillis();
        if (this.shutdown) {
            return;
        }
        if (this.writeHandler == null) {
            this.logger.log(Level.WARNING, "SocketWriter is not set, creating SocketWriter with CLUSTER protocol!");
            this.createWriterHandler("HZC");
        }
        this.fillOutputBuffer();
        if (this.dirtyOutputBuffer()) {
            this.writeOutputBufferToSocket();
        }
        if (this.newOwner == null) {
            this.unschedule();
        } else {
            this.startMigration();
        }
    }

    private void startMigration() throws IOException {
        NonBlockingIOThread newOwner = this.newOwner;
        this.newOwner = null;
        this.startMigration(newOwner);
    }

    private boolean dirtyOutputBuffer() {
        return this.outputBuffer.position() > 0;
    }

    private void writeOutputBufferToSocket() throws IOException {
        this.outputBuffer.flip();
        int written = this.socketChannel.write(this.outputBuffer);
        this.bytesWritten.inc(written);
        if (this.outputBuffer.hasRemaining()) {
            this.outputBuffer.compact();
        } else {
            this.outputBuffer.clear();
        }
    }

    private void fillOutputBuffer() throws Exception {
        while (this.outputBuffer.hasRemaining()) {
            if (this.currentFrame == null) {
                this.currentFrame = this.poll();
                if (this.currentFrame == null) {
                    return;
                }
            }
            if (!this.writeHandler.onWrite(this.currentFrame, this.outputBuffer)) {
                return;
            }
            this.currentFrame = null;
        }
        return;
    }

    @Override
    public void run() {
        try {
            this.handle();
        }
        catch (Throwable t) {
            this.onFailure(t);
        }
    }

    @Override
    public void close() {
        this.metricsRegistry.deregister(this);
        this.writeQueue.clear();
        this.urgentWriteQueue.clear();
        CloseTask closeTask = new CloseTask();
        this.write(new TaskFrame(closeTask));
        closeTask.awaitCompletion();
    }

    @Override
    public void requestMigration(NonBlockingIOThread newOwner) {
        this.write(new TaskFrame(new StartMigrationTask(newOwner)));
    }

    public String toString() {
        return this.connection + ".socketWriter";
    }

    private class CloseTask
    implements Runnable {
        private final CountDownLatch latch = new CountDownLatch(1);

        private CloseTask() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            NonBlockingSocketWriter.this.shutdown = true;
            try {
                NonBlockingSocketWriter.this.socketChannel.closeOutbound();
            }
            catch (IOException e) {
                NonBlockingSocketWriter.this.logger.finest("Error while closing outbound", e);
            }
            finally {
                this.latch.countDown();
            }
        }

        void awaitCompletion() {
            try {
                this.latch.await(3L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                EmptyStatement.ignore(e);
            }
        }
    }

    private final class StartMigrationTask
    implements Runnable {
        private final NonBlockingIOThread theNewOwner;

        public StartMigrationTask(NonBlockingIOThread theNewOwner) {
            this.theNewOwner = theNewOwner;
        }

        @Override
        public void run() {
            assert (NonBlockingSocketWriter.this.newOwner == null) : "No migration can be in progress";
            if (NonBlockingSocketWriter.this.ioThread == this.theNewOwner) {
                return;
            }
            NonBlockingSocketWriter.this.newOwner = this.theNewOwner;
        }
    }

    private static final class TaskFrame
    implements OutboundFrame {
        private final Runnable task;

        private TaskFrame(Runnable task) {
            this.task = task;
        }

        @Override
        public boolean isUrgent() {
            return true;
        }
    }
}

