/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.internal.networking.spinning;

import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.networking.Channel;
import com.hazelcast.internal.networking.ChannelErrorHandler;
import com.hazelcast.internal.networking.ChannelInitializer;
import com.hazelcast.internal.networking.ChannelOutboundHandler;
import com.hazelcast.internal.networking.InitResult;
import com.hazelcast.internal.networking.OutboundFrame;
import com.hazelcast.internal.networking.spinning.AbstractHandler;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Packet;
import com.hazelcast.util.EmptyStatement;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class SpinningChannelWriter
extends AbstractHandler {
    private static final long TIMEOUT = 3L;
    @Probe(name="writeQueueSize")
    public final Queue<OutboundFrame> writeQueue = new ConcurrentLinkedQueue<OutboundFrame>();
    @Probe(name="priorityWriteQueueSize")
    public final Queue<OutboundFrame> urgentWriteQueue = new ConcurrentLinkedQueue<OutboundFrame>();
    private final ChannelInitializer initializer;
    private ByteBuffer outputBuffer;
    @Probe(name="bytesWritten")
    private final SwCounter bytesWritten = SwCounter.newSwCounter();
    @Probe(name="normalFramesWritten")
    private final SwCounter normalFramesWritten = SwCounter.newSwCounter();
    @Probe(name="priorityFramesWritten")
    private final SwCounter priorityFramesWritten = SwCounter.newSwCounter();
    private volatile long lastWriteTime;
    private ChannelOutboundHandler outboundHandler;
    private volatile OutboundFrame currentFrame;

    public SpinningChannelWriter(Channel channel, ILogger logger, ChannelErrorHandler errorHandler, ChannelInitializer initializer) {
        super(channel, logger, errorHandler);
        this.initializer = initializer;
    }

    public void write(OutboundFrame frame) {
        if (frame.isUrgent()) {
            this.urgentWriteQueue.add(frame);
        } else {
            this.writeQueue.add(frame);
        }
    }

    @Probe(name="writeQueuePendingBytes")
    public long bytesPending() {
        return this.bytesPending(this.writeQueue);
    }

    @Probe(name="priorityWriteQueuePendingBytes")
    public long priorityBytesPending() {
        return this.bytesPending(this.urgentWriteQueue);
    }

    @Probe
    private long idleTimeMs() {
        return Math.max(System.currentTimeMillis() - this.lastWriteTime, 0L);
    }

    public int totalFramesPending() {
        return this.urgentWriteQueue.size() + this.writeQueue.size();
    }

    private long bytesPending(Queue<OutboundFrame> writeQueue) {
        long bytesPending = 0L;
        for (OutboundFrame frame : writeQueue) {
            if (!(frame instanceof Packet)) continue;
            bytesPending += (long)((Packet)frame).packetSize();
        }
        return bytesPending;
    }

    public long lastWriteTimeMillis() {
        return this.lastWriteTime;
    }

    private OutboundFrame poll() {
        OutboundFrame frame;
        boolean urgent;
        while (true) {
            urgent = true;
            frame = this.urgentWriteQueue.poll();
            if (frame == null) {
                urgent = false;
                frame = this.writeQueue.poll();
            }
            if (frame == null) {
                return null;
            }
            if (frame.getClass() != TaskFrame.class) break;
            TaskFrame taskFrame = (TaskFrame)frame;
            taskFrame.task.run();
        }
        if (urgent) {
            this.priorityFramesWritten.inc();
        } else {
            this.normalFramesWritten.inc();
        }
        return frame;
    }

    public void close() {
        this.writeQueue.clear();
        this.urgentWriteQueue.clear();
        ShutdownTask shutdownTask = new ShutdownTask();
        this.write(new TaskFrame(shutdownTask));
        shutdownTask.awaitCompletion();
    }

    public void write() throws Exception {
        if (this.channel.isClosed()) {
            return;
        }
        if (this.outboundHandler == null && !this.init()) {
            return;
        }
        this.fillOutputBuffer();
        if (this.dirtyOutputBuffer()) {
            this.writeOutputBufferToSocket();
        }
    }

    private boolean init() throws IOException {
        InitResult<ChannelOutboundHandler> init = this.initializer.initOutbound(this.channel);
        if (init == null) {
            return false;
        }
        this.outputBuffer = init.getByteBuffer();
        this.outboundHandler = init.getHandler();
        return true;
    }

    private boolean dirtyOutputBuffer() {
        if (this.outputBuffer == null) {
            return false;
        }
        return this.outputBuffer.position() > 0;
    }

    private void fillOutputBuffer() throws Exception {
        while (this.outputBuffer == null || this.outputBuffer.hasRemaining()) {
            if (this.currentFrame == null) {
                this.currentFrame = this.poll();
                if (this.currentFrame == null) {
                    return;
                }
            }
            if (!this.outboundHandler.onWrite(this.currentFrame, this.outputBuffer)) {
                return;
            }
            this.currentFrame = null;
        }
        return;
    }

    private void writeOutputBufferToSocket() throws Exception {
        this.outputBuffer.flip();
        int result = this.channel.write(this.outputBuffer);
        if (result > 0) {
            this.lastWriteTime = System.currentTimeMillis();
            this.bytesWritten.inc(result);
        }
        if (this.outputBuffer.hasRemaining()) {
            this.outputBuffer.compact();
        } else {
            this.outputBuffer.clear();
        }
    }

    private class ShutdownTask
    implements Runnable {
        private final CountDownLatch latch = new CountDownLatch(1);

        private ShutdownTask() {
        }

        @Override
        public void run() {
            try {
                SpinningChannelWriter.this.channel.closeOutbound();
            }
            catch (IOException e) {
                SpinningChannelWriter.this.logger.finest("Error while closing outbound", e);
            }
            finally {
                this.latch.countDown();
            }
        }

        void awaitCompletion() {
            try {
                this.latch.await(3L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                EmptyStatement.ignore(e);
            }
        }
    }

    private static final class TaskFrame
    implements OutboundFrame {
        private final Runnable task;

        private TaskFrame(Runnable task) {
            this.task = task;
        }

        @Override
        public boolean isUrgent() {
            return true;
        }
    }
}

