/*
 * Decompiled with CFR 0.152.
 */
package com.swirlds.common.stream;

import com.swirlds.common.crypto.DigestType;
import com.swirlds.common.crypto.Hash;
import com.swirlds.common.crypto.ImmutableHash;
import com.swirlds.common.crypto.SerializableHashable;
import com.swirlds.common.stream.EventStreamType;
import com.swirlds.common.stream.HashCalculatorForStream;
import com.swirlds.common.stream.MultiStream;
import com.swirlds.common.stream.QueueThreadObjectStream;
import com.swirlds.common.stream.QueueThreadObjectStreamConfiguration;
import com.swirlds.common.stream.RunningHashCalculatorForStream;
import com.swirlds.common.stream.Signer;
import com.swirlds.common.stream.StreamAligned;
import com.swirlds.common.stream.Timestamped;
import com.swirlds.common.stream.internal.TimestampStreamFileWriter;
import com.swirlds.common.system.NodeId;
import com.swirlds.common.threading.manager.ThreadManager;
import com.swirlds.logging.LogMarker;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.function.Predicate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;

public class EventStreamManager<T extends StreamAligned & Timestamped & SerializableHashable> {
    private static final Logger logger = LogManager.getLogger(EventStreamManager.class);
    private final MultiStream<T> multiStream;
    private final Predicate<T> isLastEventInFreezeCheck;
    private QueueThreadObjectStream<T> hashQueueThread;
    private HashCalculatorForStream<T> hashCalculator;
    private QueueThreadObjectStream<T> writeQueueThread;
    private TimestampStreamFileWriter<T> streamFileWriter;
    private Hash initialHash = new ImmutableHash(new byte[DigestType.SHA_384.digestLength()]);
    private volatile boolean freezePeriodStarted = false;

    public EventStreamManager(ThreadManager threadManager, NodeId selfId, Signer signer, String nodeName, boolean enableEventStreaming, String eventsLogDir, long eventsLogPeriod, int eventStreamQueueCapacity, Predicate<T> isLastEventInFreezeCheck) throws NoSuchAlgorithmException, IOException {
        if (enableEventStreaming) {
            String eventStreamDir = eventsLogDir + "/events_" + nodeName;
            Files.createDirectories(Paths.get(eventStreamDir, new String[0]), new FileAttribute[0]);
            this.streamFileWriter = new TimestampStreamFileWriter(eventStreamDir, eventsLogPeriod * 1000L, signer, false, EventStreamType.getInstance());
            this.writeQueueThread = new QueueThreadObjectStreamConfiguration(threadManager).setNodeId(selfId.getId()).setComponent("event-stream").setThreadName("write-queue").setForwardTo(this.streamFileWriter).build();
            this.writeQueueThread.start();
        }
        RunningHashCalculatorForStream runningHashCalculator = new RunningHashCalculatorForStream();
        this.hashCalculator = new HashCalculatorForStream(runningHashCalculator);
        this.hashQueueThread = new QueueThreadObjectStreamConfiguration(threadManager).setNodeId(selfId.getId()).setComponent("event-stream").setThreadName("hash-queue").setForwardTo(this.hashCalculator).build();
        this.hashQueueThread.start();
        this.multiStream = new MultiStream(enableEventStreaming ? List.of(this.hashQueueThread, this.writeQueueThread) : List.of(this.hashQueueThread));
        this.multiStream.setRunningHash(this.initialHash);
        this.isLastEventInFreezeCheck = isLastEventInFreezeCheck;
    }

    public EventStreamManager(MultiStream<T> multiStream, Predicate<T> isLastEventInFreezeCheck) {
        this.multiStream = multiStream;
        multiStream.setRunningHash(this.initialHash);
        this.isLastEventInFreezeCheck = isLastEventInFreezeCheck;
    }

    public void stop() {
        this.writeQueueThread.stop();
        this.hashQueueThread.stop();
        this.streamFileWriter.close();
        this.multiStream.close();
    }

    public void addEvents(List<T> events) {
        events.forEach(x$0 -> this.addEvent((StreamAligned)x$0));
    }

    public void addEvent(T event) {
        if (!this.freezePeriodStarted) {
            this.multiStream.addObject(event);
            if (this.isLastEventInFreezeCheck.test(event)) {
                this.freezePeriodStarted = true;
                Supplier[] supplierArray = new Supplier[1];
                supplierArray[0] = ((Timestamped)event)::getTimestamp;
                logger.info(LogMarker.EVENT_STREAM.getMarker(), "ConsensusTimestamp of the last Event to be written into file before restarting: {}", supplierArray);
                this.multiStream.close();
            }
        } else {
            logger.warn(LogMarker.EVENT_STREAM.getMarker(), "Event {} dropped after freezePeriodStarted!", (Object)((Timestamped)event).getTimestamp());
        }
    }

    public void setStartWriteAtCompleteWindow(boolean startWriteAtCompleteWindow) {
        if (this.streamFileWriter != null) {
            this.streamFileWriter.setStartWriteAtCompleteWindow(startWriteAtCompleteWindow);
        }
    }

    public int getHashQueueSize() {
        return this.hashQueueThread.getQueue().size();
    }

    public int getEventStreamingQueueSize() {
        return this.writeQueueThread == null ? 0 : this.writeQueueThread.getQueue().size();
    }

    public MultiStream<T> getMultiStream() {
        return this.multiStream;
    }

    public TimestampStreamFileWriter<T> getStreamFileWriter() {
        return this.streamFileWriter;
    }

    public HashCalculatorForStream<T> getHashCalculator() {
        return this.hashCalculator;
    }

    public boolean getFreezePeriodStarted() {
        return this.freezePeriodStarted;
    }

    public Hash getInitialHash() {
        return new Hash(this.initialHash);
    }

    public void setInitialHash(Hash initialHash) {
        this.initialHash = initialHash;
        logger.info(LogMarker.EVENT_STREAM.getMarker(), "EventStreamManager::setInitialHash: {}", new Supplier[]{() -> initialHash});
        this.multiStream.setRunningHash(initialHash);
    }
}

