/*
 * Decompiled with CFR 0.152.
 */
package com.swirlds.common.merkle.synchronization.streams;

import com.swirlds.common.io.SelfSerializable;
import com.swirlds.common.io.SerializableDataOutputStream;
import com.swirlds.common.merkle.synchronization.settings.ReconnectSettings;
import com.swirlds.common.merkle.synchronization.settings.ReconnectSettingsFactory;
import com.swirlds.common.merkle.synchronization.utility.MerkleSynchronizationException;
import com.swirlds.common.threading.StandardWorkGroup;
import com.swirlds.logging.LogMarker;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class AsyncOutputStream<T extends SelfSerializable>
implements AutoCloseable {
    private static final Logger LOG = LogManager.getLogger(AsyncOutputStream.class);
    private final SerializableDataOutputStream outputStream;
    private final BlockingQueue<T> outgoingMessages;
    private final StopWatch timeSinceLastFlush;
    private final int flushIntervalMs;
    private volatile boolean alive;
    private int bufferedMessageCount;
    private final int timeoutMs;
    private final StandardWorkGroup workGroup;

    public AsyncOutputStream(SerializableDataOutputStream outputStream, StandardWorkGroup workGroup) {
        ReconnectSettings settings = ReconnectSettingsFactory.get();
        this.outputStream = outputStream;
        this.workGroup = workGroup;
        this.outgoingMessages = new LinkedBlockingQueue<T>(settings.getAsyncStreamBufferSize());
        this.alive = true;
        this.timeSinceLastFlush = new StopWatch();
        this.timeSinceLastFlush.start();
        this.flushIntervalMs = settings.getAsyncOutputStreamFlushMilliseconds();
        this.timeoutMs = settings.getAsyncStreamTimeoutMilliseconds();
    }

    public void start() {
        this.workGroup.execute("async-output-stream", this::run);
    }

    public int getFlushIntervalMs() {
        return this.flushIntervalMs;
    }

    public boolean isAlive() {
        return this.alive;
    }

    protected SerializableDataOutputStream getOutputStream() {
        return this.outputStream;
    }

    protected BlockingQueue<T> getOutgoingMessages() {
        return this.outgoingMessages;
    }

    public void run() {
        while (!(!this.isAlive() && this.outgoingMessages.isEmpty() || Thread.currentThread().isInterrupted())) {
            this.flushIfRequired();
            boolean workDone = this.handleNextMessage();
            if (workDone || (workDone = this.flush())) continue;
            try {
                Thread.sleep(0L, 1);
            }
            catch (InterruptedException e) {
                LOG.warn(LogMarker.RECONNECT.getMarker(), "AsyncOutputStream interrupted");
                this.alive = false;
                Thread.currentThread().interrupt();
                return;
            }
        }
        this.flush();
    }

    public void sendAsync(T message) throws InterruptedException {
        if (!this.isAlive()) {
            throw new MerkleSynchronizationException("Messages can not be sent after close has been called.");
        }
        boolean success = this.outgoingMessages.offer(message, this.timeoutMs, TimeUnit.MILLISECONDS);
        if (!success) {
            try {
                this.outputStream.close();
            }
            catch (IOException e) {
                throw new MerkleSynchronizationException("Unable to close stream", e);
            }
            throw new MerkleSynchronizationException("Timed out waiting to send data");
        }
    }

    @Override
    public void close() {
        this.alive = false;
    }

    private boolean handleNextMessage() {
        if (!this.outgoingMessages.isEmpty()) {
            SelfSerializable message = (SelfSerializable)this.outgoingMessages.remove();
            try {
                this.serializeMessage(message);
            }
            catch (IOException e) {
                throw new MerkleSynchronizationException(e);
            }
            ++this.bufferedMessageCount;
            return true;
        }
        return false;
    }

    protected void serializeMessage(T message) throws IOException {
        message.serialize(this.outputStream);
    }

    private boolean flush() {
        this.timeSinceLastFlush.reset();
        this.timeSinceLastFlush.start();
        if (this.bufferedMessageCount > 0) {
            try {
                this.outputStream.flush();
            }
            catch (IOException e) {
                throw new MerkleSynchronizationException(e);
            }
            this.bufferedMessageCount = 0;
            return true;
        }
        return false;
    }

    private void flushIfRequired() {
        if (this.timeSinceLastFlush.getTime(TimeUnit.MILLISECONDS) > (long)this.flushIntervalMs) {
            this.flush();
        }
    }
}

