/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Options;
import io.nats.client.StatisticsCollector;
import io.nats.client.impl.DataPort;
import io.nats.client.impl.MessageQueue;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsMessage;
import io.nats.client.support.BuilderBase;
import io.nats.client.support.ByteArrayBuilder;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

class NatsConnectionWriter
implements Runnable {
    private static final int BUFFER_BLOCK_SIZE = 256;
    private final NatsConnection connection;
    private final ReentrantLock writerLock;
    private Future<Boolean> stopped;
    private Future<DataPort> dataPortFuture;
    private DataPort dataPort;
    private final AtomicBoolean running;
    private final AtomicBoolean reconnectMode;
    private final ReentrantLock startStopLock;
    private byte[] sendBuffer;
    private final AtomicInteger sendBufferLength;
    private final MessageQueue outgoing;
    private final MessageQueue reconnectOutgoing;
    private final long reconnectBufferSize;

    NatsConnectionWriter(NatsConnection connection, NatsConnectionWriter sourceWriter) {
        this.connection = connection;
        this.writerLock = new ReentrantLock();
        this.running = new AtomicBoolean(false);
        this.reconnectMode = new AtomicBoolean(sourceWriter != null);
        this.startStopLock = new ReentrantLock();
        this.stopped = new CompletableFuture<Boolean>();
        ((CompletableFuture)this.stopped).complete(Boolean.TRUE);
        Options options = connection.getOptions();
        int sbl = BuilderBase.bufferAllocSize(options.getBufferSize(), 256);
        this.sendBufferLength = new AtomicInteger(sbl);
        this.sendBuffer = new byte[sbl];
        this.outgoing = new MessageQueue(true, options.getMaxMessagesInOutgoingQueue(), options.isDiscardMessagesWhenOutgoingQueueFull(), options.getRequestCleanupInterval(), sourceWriter == null ? null : sourceWriter.outgoing);
        this.reconnectOutgoing = new MessageQueue(true, options.getRequestCleanupInterval(), sourceWriter == null ? null : sourceWriter.reconnectOutgoing);
        this.reconnectBufferSize = options.getReconnectBufferSize();
    }

    void start(Future<DataPort> dataPortFuture) {
        this.startStopLock.lock();
        try {
            this.dataPortFuture = dataPortFuture;
            this.running.set(true);
            this.outgoing.resume();
            this.reconnectOutgoing.resume();
            this.stopped = this.connection.getExecutor().submit(this, Boolean.TRUE);
        }
        finally {
            this.startStopLock.unlock();
        }
    }

    Future<Boolean> stop() {
        if (this.running.get()) {
            this.running.set(false);
            this.startStopLock.lock();
            try {
                this.outgoing.pause();
                this.reconnectOutgoing.pause();
                this.outgoing.filter(NatsMessage::isProtocolFilterOnStop);
            }
            finally {
                this.startStopLock.unlock();
            }
        }
        return this.stopped;
    }

    boolean isRunning() {
        return this.running.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector stats) throws IOException {
        this.writerLock.lock();
        try {
            int sendPosition = 0;
            int sbl = this.sendBufferLength.get();
            while (msg != null) {
                long size = msg.getSizeInBytes();
                if ((long)sendPosition + size > (long)sbl) {
                    if (sendPosition > 0) {
                        dataPort.write(this.sendBuffer, sendPosition);
                        this.connection.getStatisticsCollector().registerWrite(sendPosition);
                        sendPosition = 0;
                    }
                    if (size > (long)sbl) {
                        sbl = BuilderBase.bufferAllocSize((int)size, 256);
                        this.sendBufferLength.set(sbl);
                        this.sendBuffer = new byte[sbl];
                    }
                }
                ByteArrayBuilder bab = msg.getProtocolBab();
                int babLen = bab.length();
                System.arraycopy(bab.internalArray(), 0, this.sendBuffer, sendPosition, babLen);
                sendPosition += babLen;
                this.sendBuffer[sendPosition++] = 13;
                this.sendBuffer[sendPosition++] = 10;
                if (!msg.isProtocol()) {
                    sendPosition += msg.copyNotEmptyHeaders(sendPosition, this.sendBuffer);
                    byte[] bytes = msg.getData();
                    if (bytes.length > 0) {
                        System.arraycopy(bytes, 0, this.sendBuffer, sendPosition, bytes.length);
                        sendPosition += bytes.length;
                    }
                    this.sendBuffer[sendPosition++] = 13;
                    this.sendBuffer[sendPosition++] = 10;
                }
                stats.incrementOutMsgs();
                stats.incrementOutBytes(size);
                if (msg.flushImmediatelyAfterPublish) {
                    dataPort.flush();
                }
                msg = msg.next;
            }
            if (sendPosition > 0) {
                dataPort.write(this.sendBuffer, sendPosition);
                this.connection.getStatisticsCollector().registerWrite(sendPosition);
            }
        }
        finally {
            this.writerLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Duration outgoingTimeout = Duration.ofMinutes(2L);
        Duration reconnectTimeout = Duration.ofMillis(1L);
        try {
            this.dataPort = this.dataPortFuture.get();
            StatisticsCollector stats = this.connection.getStatisticsCollector();
            while (this.running.get() && !Thread.interrupted()) {
                NatsMessage msg = this.reconnectMode.get() ? this.reconnectOutgoing.accumulate(this.sendBufferLength.get(), 1000L, reconnectTimeout) : this.outgoing.accumulate(this.sendBufferLength.get(), 1000L, outgoingTimeout);
                if (msg == null) continue;
                this.sendMessageBatch(msg, this.dataPort, stats);
            }
        }
        catch (IOException | BufferOverflowException io) {
            if (this.running.get()) {
                this.connection.handleCommunicationIssue(io);
            }
        }
        catch (CancellationException | ExecutionException io) {
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
        finally {
            this.running.set(false);
        }
    }

    void setReconnectMode(boolean tf) {
        this.reconnectMode.set(tf);
    }

    boolean canQueueDuringReconnect(NatsMessage msg) {
        return this.reconnectBufferSize < 0L || this.outgoing.sizeInBytes() + msg.getSizeInBytes() < this.reconnectBufferSize;
    }

    boolean queue(NatsMessage msg) {
        return this.outgoing.push(msg);
    }

    void queueInternalMessage(NatsMessage msg) {
        if (this.reconnectMode.get()) {
            this.reconnectOutgoing.push(msg);
        } else {
            this.outgoing.push(msg, true);
        }
    }

    void flushBuffer() {
        this.writerLock.lock();
        try {
            if (this.running.get()) {
                this.dataPort.flush();
            }
        }
        catch (Exception exception) {
        }
        finally {
            this.writerLock.unlock();
        }
    }

    long outgoingPendingMessageCount() {
        return this.outgoing.length();
    }

    long outgoingPendingBytes() {
        return this.outgoing.sizeInBytes();
    }
}

