/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Options;
import io.nats.client.impl.DataPort;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsMessage;
import io.nats.client.support.ByteArrayBuilder;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

public class NatsConnectionWriter
implements Runnable {
    private static final int TOTAL_SLEEP = 40;
    private static final int EACH_SLEEP = 4;
    private static final int MAX_BEFORE_FLUSH = 10;
    private final NatsConnection connection;
    private Future<Boolean> stopped;
    private Future<DataPort> dataPortFuture;
    private DataPort dataPort = null;
    private final AtomicBoolean running;
    private final AtomicBoolean reconnectMode;
    private final ReentrantLock startStopLock;
    private final ByteArrayBuilder regularSendBuffer;
    private final ByteArrayBuilder reconnectSendBuffer;
    private final int discardMessageCountThreshold;
    private final long reconnectBufferSize;
    private final ReentrantLock buffersAccessLock;
    private long regularQueuedMessageCount;
    private long reconnectQueuedMessageCount;

    NatsConnectionWriter(NatsConnection connection) {
        this.connection = connection;
        this.running = new AtomicBoolean(false);
        this.reconnectMode = new AtomicBoolean(false);
        this.startStopLock = new ReentrantLock();
        this.stopped = new CompletableFuture<Boolean>();
        ((CompletableFuture)this.stopped).complete(Boolean.TRUE);
        Options options = connection.getOptions();
        int bufSize = options.getBufferSize();
        this.regularSendBuffer = new ByteArrayBuilder(bufSize);
        this.reconnectSendBuffer = new ByteArrayBuilder(bufSize);
        this.discardMessageCountThreshold = options.isDiscardMessagesWhenOutgoingQueueFull() ? options.getMaxMessagesInOutgoingQueue() : Integer.MAX_VALUE;
        this.reconnectBufferSize = options.getReconnectBufferSize();
        this.buffersAccessLock = new ReentrantLock();
        this.regularQueuedMessageCount = 0L;
        this.reconnectQueuedMessageCount = 0L;
    }

    void start(Future<DataPort> dataPortFuture) {
        this.startStopLock.lock();
        try {
            this.dataPortFuture = dataPortFuture;
            this.running.set(true);
            this.stopped = this.connection.getExecutor().submit(this, Boolean.TRUE);
        }
        finally {
            this.startStopLock.unlock();
        }
    }

    Future<Boolean> stop() {
        this.running.set(false);
        return this.stopped;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            this.dataPort = this.dataPortFuture.get();
            int waits = 0;
            while (this.running.get()) {
                long mcount;
                boolean rmode = this.reconnectMode.get();
                long l = mcount = rmode ? this.reconnectQueuedMessageCount : this.regularQueuedMessageCount;
                while (waits < 40 && mcount < 10L) {
                    try {
                        Thread.sleep(4L);
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    waits += 4;
                }
                if (mcount <= 0L) continue;
                this.buffersAccessLock.lock();
                try {
                    ByteArrayBuilder bab = rmode ? this.reconnectSendBuffer : this.regularSendBuffer;
                    int byteCount = bab.length();
                    this.dataPort.write(bab.internalArray(), byteCount);
                    bab.clear();
                    this.connection.getNatsStatistics().registerWrite(byteCount);
                    if (rmode) {
                        this.reconnectQueuedMessageCount = 0L;
                        continue;
                    }
                    this.regularQueuedMessageCount = 0L;
                }
                finally {
                    this.buffersAccessLock.unlock();
                }
            }
        }
        catch (IOException | BufferOverflowException io) {
            this.connection.handleCommunicationIssue(io);
        }
        catch (InterruptedException | CancellationException | ExecutionException exception) {
        }
        finally {
            this.running.set(false);
        }
    }

    void setReconnectMode(boolean reconnectMode) {
        this.reconnectMode.set(reconnectMode);
    }

    boolean canQueueDuringReconnect(NatsMessage msg) {
        return this.reconnectBufferSize < 0L || (long)this.regularSendBuffer.length() + msg.getSizeInBytes() < this.reconnectBufferSize;
    }

    boolean queue(NatsMessage msg) {
        if (this.regularQueuedMessageCount >= (long)this.discardMessageCountThreshold) {
            return false;
        }
        this._queue(msg, this.regularSendBuffer);
        return true;
    }

    void queueInternalMessage(NatsMessage msg) {
        if (this.reconnectMode.get()) {
            this._queue(msg, this.reconnectSendBuffer);
        } else {
            this._queue(msg, this.regularSendBuffer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void _queue(NatsMessage msg, ByteArrayBuilder bab) {
        this.buffersAccessLock.lock();
        try {
            long startSize = bab.length();
            msg.appendSerialized(bab);
            long added = (long)bab.length() - startSize;
            if (bab == this.regularSendBuffer) {
                ++this.regularQueuedMessageCount;
            } else {
                ++this.reconnectQueuedMessageCount;
            }
            this.connection.getNatsStatistics().incrementOutMsgsAndBytes(added);
        }
        finally {
            this.buffersAccessLock.unlock();
        }
    }

    synchronized void flushBuffer() {
        try {
            if (this.running.get()) {
                this.dataPort.flush();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }
}

