/*
 * Decompiled with CFR 0.152.
 */
package com.swirlds.common.merkle.synchronization.streams;

import com.swirlds.common.Releasable;
import com.swirlds.common.config.singleton.ConfigurationHolder;
import com.swirlds.common.constructable.RuntimeConstructable;
import com.swirlds.common.io.SelfSerializable;
import com.swirlds.common.io.streams.SerializableDataInputStream;
import com.swirlds.common.merkle.synchronization.config.ReconnectConfig;
import com.swirlds.common.merkle.synchronization.utility.MerkleSynchronizationException;
import com.swirlds.common.threading.pool.StandardWorkGroup;
import com.swirlds.logging.LogMarker;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class AsyncInputStream<T extends SelfSerializable>
implements AutoCloseable {
    private static final Logger logger = LogManager.getLogger(AsyncInputStream.class);
    private static final String THREAD_NAME = "async-input-stream";
    private final SerializableDataInputStream inputStream;
    private final AtomicInteger anticipatedMessages;
    private final BlockingQueue<SelfSerializable> receivedMessages;
    private final int pollTimeoutMs;
    private final CountDownLatch finishedLatch;
    private volatile boolean alive;
    private final Supplier<T> messageFactory;
    private final StandardWorkGroup workGroup;

    public AsyncInputStream(SerializableDataInputStream inputStream, StandardWorkGroup workGroup, Supplier<T> messageFactory) {
        ReconnectConfig config = ConfigurationHolder.getConfigData(ReconnectConfig.class);
        this.inputStream = inputStream;
        this.workGroup = workGroup;
        this.messageFactory = messageFactory;
        this.pollTimeoutMs = config.asyncStreamTimeoutMilliseconds();
        this.anticipatedMessages = new AtomicInteger(0);
        this.receivedMessages = new LinkedBlockingQueue<SelfSerializable>(config.asyncStreamBufferSize());
        this.finishedLatch = new CountDownLatch(1);
        this.alive = true;
    }

    public void start() {
        this.workGroup.execute(THREAD_NAME, this::run);
    }

    public boolean isAlive() {
        return this.alive;
    }

    public long getTotalAnticipatedMessages() {
        return this.anticipatedMessages.get();
    }

    private void run() {
        RuntimeConstructable message = null;
        try {
            while (this.isAlive() && !Thread.currentThread().isInterrupted()) {
                int previous = this.anticipatedMessages.getAndUpdate(value -> value == 0 ? 0 : value - 1);
                if (previous == 0) {
                    TimeUnit.MILLISECONDS.sleep(1L);
                    continue;
                }
                message = (SelfSerializable)this.messageFactory.get();
                message.deserialize(this.inputStream, message.getVersion());
                this.receivedMessages.put((SelfSerializable)message);
            }
        }
        catch (IOException e) {
            throw new MerkleSynchronizationException(String.format("Failed to deserialize object with class ID %d(0x%08X) (%s)", message.getClassId(), message.getClassId(), message.getClass().toString()), e);
        }
        catch (InterruptedException e) {
            logger.warn(LogMarker.RECONNECT.getMarker(), "AsyncInputStream interrupted");
            Thread.currentThread().interrupt();
        }
        finally {
            this.finishedLatch.countDown();
        }
    }

    public void anticipateMessage() {
        this.anticipatedMessages.getAndIncrement();
    }

    public T readAnticipatedMessage() throws InterruptedException {
        return this.asyncRead();
    }

    public void abort() {
        this.close();
        try {
            this.finishedLatch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        while (!this.receivedMessages.isEmpty()) {
            SelfSerializable message = (SelfSerializable)this.receivedMessages.remove();
            if (!(message instanceof Releasable)) continue;
            ((Releasable)((Object)message)).release();
        }
    }

    @Override
    public void close() {
        this.alive = false;
    }

    private T asyncRead() throws InterruptedException {
        SelfSerializable data = this.receivedMessages.poll(this.pollTimeoutMs, TimeUnit.MILLISECONDS);
        if (data == null) {
            try {
                this.inputStream.close();
            }
            catch (IOException e) {
                throw new MerkleSynchronizationException("Unable to close stream", e);
            }
            throw new MerkleSynchronizationException("Timed out waiting for data");
        }
        return (T)data;
    }
}

