/*
 * Decompiled with CFR 0.152.
 */
package org.broadinstitute.hellbender.utils.runtime;

import com.google.common.io.Files;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.io.IOUtils;
import org.broadinstitute.hellbender.utils.runtime.AsynchronousStreamWriter;
import org.broadinstitute.hellbender.utils.runtime.CapturedStreamOutput;
import org.broadinstitute.hellbender.utils.runtime.CapturedStreamOutputSnapshot;
import org.broadinstitute.hellbender.utils.runtime.ProcessControllerAckResult;
import org.broadinstitute.hellbender.utils.runtime.ProcessControllerBase;
import org.broadinstitute.hellbender.utils.runtime.ProcessOutput;
import org.broadinstitute.hellbender.utils.runtime.ProcessSettings;
import org.broadinstitute.hellbender.utils.runtime.StreamOutput;
import org.broadinstitute.hellbender.utils.runtime.StreamingToolConstants;

public final class StreamingProcessController
extends ProcessControllerBase<CapturedStreamOutputSnapshot> {
    private static final Logger logger = LogManager.getLogger(StreamingProcessController.class);
    private final ProcessSettings settings;
    private static String ACK_FIFO_FILE_NAME = "gatkStreamingControllerAck.fifo";
    private static String DATA_FIFO_FILE_NAME = "gatkStreamingControllerData.fifo";
    private File fifoTempDir = null;
    private File ackFIFOFile = null;
    private File dataFIFOFile = null;
    private InputStream ackFIFOInputStream;
    private Future<ProcessControllerAckResult> ackFuture;
    private ProcessJournal processJournal = new ProcessJournal();
    private OutputStream processStdinStream;
    private static final int REMOTE_PROCESS_TERMINATION_TIMEOUT_SECONDS = 30;

    public StreamingProcessController(ProcessSettings settings) {
        this(settings, false);
    }

    public StreamingProcessController(ProcessSettings settings, boolean enableJournaling) {
        Utils.nonNull(settings, "Process settings are required");
        this.settings = settings;
        if (enableJournaling) {
            this.processJournal.enable(settings.getCommandString());
        }
    }

    public File start() {
        if (this.process != null) {
            throw new IllegalStateException("This controller is already running a process");
        }
        this.process = this.launchProcess(this.settings);
        this.startListeners();
        this.processStdinStream = this.getProcess().getOutputStream();
        this.fifoTempDir = Files.createTempDir();
        this.ackFIFOFile = this.createFIFOFile(ACK_FIFO_FILE_NAME);
        return this.ackFIFOFile;
    }

    public void writeProcessInput(String line) {
        try {
            if (this.stdErrFuture != null && this.stdErrFuture.isDone()) {
                this.processJournal.writeLogMessage("Dropping stale stderr output before send: \n" + ((CapturedStreamOutputSnapshot)this.stdErrFuture.get()).getBufferString() + "\n");
                this.stdErrFuture = null;
            }
            if (this.stdOutFuture != null && this.stdOutFuture.isDone()) {
                this.processJournal.writeLogMessage("Dropping stale stdout output before send: \n" + ((CapturedStreamOutputSnapshot)this.stdOutFuture.get()).getBufferString() + "\n");
                this.stdOutFuture = null;
            }
        }
        catch (InterruptedException e) {
            throw new GATKException(String.format("Interrupted retrieving stale future: " + line, e));
        }
        catch (ExecutionException e) {
            throw new GATKException(String.format("Execution exception retrieving stale future: " + line, e));
        }
        this.startListeners();
        try {
            this.processStdinStream.write(line.getBytes());
            this.processStdinStream.flush();
            this.processJournal.writeOutbound(line);
        }
        catch (IOException e) {
            throw new GATKException(String.format("Error writing (%s) to stdin on command", line), e);
        }
    }

    private StreamOutput drainOutputStream(Future<CapturedStreamOutputSnapshot> streamOutputFuture) {
        StreamOutput ret = null;
        if (streamOutputFuture != null) {
            try {
                if (streamOutputFuture.isDone()) {
                    ret = streamOutputFuture.get();
                }
            }
            catch (InterruptedException e) {
                throw new GATKException("InterruptedException attempting to retrieve output from remote process", e);
            }
            catch (ExecutionException e) {
                throw new GATKException("ExecutionException attempting to retrieve output from remote process", e);
            }
        }
        return ret;
    }

    public ProcessOutput getProcessOutput() {
        StreamOutput stdout;
        StreamOutput stderr = this.drainOutputStream(this.stdErrFuture);
        if (stderr != null) {
            this.stdErrFuture = null;
        }
        if ((stdout = this.drainOutputStream(this.stdOutFuture)) != null) {
            this.stdOutFuture = null;
        }
        this.processJournal.writeInbound(stdout, stderr);
        this.startListeners();
        return new ProcessOutput(0, stdout, stderr);
    }

    public void openAckFIFOForRead() {
        try {
            this.ackFIFOInputStream = new FileInputStream(this.ackFIFOFile);
        }
        catch (FileNotFoundException e) {
            throw new GATKException("Can't open ack FIFO for read");
        }
    }

    public ProcessControllerAckResult waitForAck() {
        if (this.ackFuture != null) {
            throw new GATKException("An ack request is already outstanding");
        }
        this.ackFuture = executorService.submit(() -> {
            try {
                String ackMessage = this.getBytesFromStream(StreamingToolConstants.STREAMING_ACK_MESSAGE_SIZE);
                if (ackMessage.equals(StreamingToolConstants.STREAMING_ACK_MESSAGE)) {
                    return new ProcessControllerAckResult(true);
                }
                if (ackMessage.equals(StreamingToolConstants.STREAMING_NCK_MESSAGE)) {
                    return new ProcessControllerAckResult(false);
                }
                if (ackMessage.equals(StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MESSAGE)) {
                    return this.getNckWithMessageResult();
                }
                String badAckMessage = "An unrecognized ack string message was written to ack fifo";
                logger.error("An unrecognized ack string message was written to ack fifo");
                return new ProcessControllerAckResult("An unrecognized ack string message was written to ack fifo");
            }
            catch (IOException e) {
                throw new GATKException("IOException reading from ack fifo", e);
            }
        });
        try {
            ProcessControllerAckResult pcAck = this.ackFuture.get();
            this.processJournal.writeLogMessage(pcAck.getDisplayMessage());
            this.ackFuture = null;
            return pcAck;
        }
        catch (InterruptedException | ExecutionException e) {
            throw new GATKException("Exception waiting for ack from Python: " + e.getMessage(), e);
        }
    }

    private ProcessControllerAckResult getNckWithMessageResult() throws IOException {
        String messageLengthString = this.getBytesFromStream(StreamingToolConstants.STREAMING_NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE);
        int messageLength = Integer.valueOf(messageLengthString);
        if (messageLength < 0) {
            throw new GATKException("Negative ack message length  must be > 0");
        }
        String nckMessage = this.getBytesFromStream(messageLength);
        return new ProcessControllerAckResult(nckMessage);
    }

    private String getBytesFromStream(int expectedMessageLength) throws IOException {
        int nBytesReceived;
        int readLen;
        int nBytesRemaining = expectedMessageLength;
        StringBuilder sb = new StringBuilder();
        for (nBytesReceived = 0; nBytesReceived < expectedMessageLength; nBytesReceived += readLen) {
            byte[] nckMessage = new byte[nBytesRemaining];
            readLen = this.ackFIFOInputStream.read(nckMessage, 0, nBytesRemaining);
            if (readLen <= 0) {
                throw new GATKException(String.format("Expected message of length %d but only found %d bytes", expectedMessageLength, nBytesReceived));
            }
            sb.append(new String(nckMessage, 0, readLen));
            nBytesRemaining -= readLen;
        }
        if (nBytesReceived != expectedMessageLength) {
            throw new GATKException(String.format("Expected message of length %d but found %d", expectedMessageLength, nBytesReceived));
        }
        return sb.toString();
    }

    public File createDataFIFO() {
        if (this.dataFIFOFile != null) {
            throw new IllegalArgumentException("Only one data FIFO per controller is supported");
        }
        this.dataFIFOFile = this.createFIFOFile(DATA_FIFO_FILE_NAME);
        return this.dataFIFOFile;
    }

    private File createFIFOFile(String fifoName) {
        String fifoTempFileName = String.format("%s/%s", this.fifoTempDir.getAbsolutePath(), fifoName);
        return IOUtils.createFifoFile(IOUtils.getPath(fifoTempFileName), true);
    }

    public <T> AsynchronousStreamWriter<T> getAsynchronousStreamWriter(OutputStream outputStream, Function<T, ByteArrayOutputStream> itemSerializer) {
        Utils.nonNull(outputStream);
        Utils.nonNull(itemSerializer);
        return new AsynchronousStreamWriter<T>(executorService, outputStream, itemSerializer);
    }

    private void closeFIFOs() {
        if (this.dataFIFOFile != null) {
            this.dataFIFOFile.delete();
        }
        if (this.ackFIFOFile != null) {
            this.ackFIFOFile.delete();
        }
        this.fifoTempDir.delete();
    }

    private void startListeners() {
        if (this.stdOutFuture == null) {
            this.stdOutFuture = executorService.submit(new ProcessControllerBase.OutputCapture((ProcessControllerBase)this, (CapturedStreamOutput)new CapturedStreamOutputSnapshot(this.settings.getStdoutSettings(), this.process.getInputStream(), System.out), ProcessControllerBase.ProcessStream.STDOUT, this.getClass().getSimpleName(), this.controllerId));
        }
        if (!this.settings.isRedirectErrorStream() && this.stdErrFuture == null) {
            this.stdErrFuture = executorService.submit(new ProcessControllerBase.OutputCapture((ProcessControllerBase)this, (CapturedStreamOutput)new CapturedStreamOutputSnapshot(this.settings.getStderrSettings(), this.process.getErrorStream(), System.err), ProcessControllerBase.ProcessStream.STDERR, this.getClass().getSimpleName(), this.controllerId));
        }
    }

    @Override
    protected void tryCleanShutdown() {
        boolean isCancelled;
        if (this.stdErrFuture != null && !this.stdErrFuture.isDone() && !(isCancelled = this.stdErrFuture.cancel(true))) {
            logger.error("Failure cancelling stderr task");
        }
        if (this.stdOutFuture != null && !this.stdOutFuture.isDone() && !(isCancelled = this.stdOutFuture.cancel(true))) {
            logger.error("Failure cancelling stdout task");
        }
        if (this.process != null) {
            org.apache.commons.io.IOUtils.closeQuietly((OutputStream)this.process.getOutputStream());
        }
    }

    public void terminate() {
        this.closeFIFOs();
        this.tryCleanShutdown();
        boolean exited = false;
        try {
            exited = this.process.waitFor(30L, TimeUnit.SECONDS);
            this.processJournal.close();
            if (!exited) {
                this.process.destroy();
                this.process.waitFor(30L, TimeUnit.SECONDS);
            }
        }
        catch (InterruptedException e) {
            logger.error(String.format("Interrupt exception waiting for process (%s) to terminate", this.settings.getCommandString()));
        }
        if (this.process.isAlive()) {
            throw new GATKException("Failure terminating remote process");
        }
    }

    private class ProcessJournal {
        private File journalingFile = null;
        private FileWriter journalingFileWriter;
        private boolean cleanUpJournal = false;

        private ProcessJournal() {
        }

        public void enable(String commandString) {
            this.enable(commandString, false);
        }

        public void enable(String commandString, boolean cleanUpJournal) {
            String journalingFileName = String.format("gatkStreamingProcessJournal-%d.txt", new Random().nextInt());
            this.journalingFile = new File(journalingFileName);
            this.cleanUpJournal = cleanUpJournal;
            try {
                this.journalingFileWriter = new FileWriter(this.journalingFile);
                this.journalingFileWriter.write("Initial process command line: ");
                this.journalingFileWriter.write(StreamingProcessController.this.settings.getCommandString() + "\n\n");
            }
            catch (IOException e) {
                throw new GATKException(String.format("Error creating streaming process journaling file %s for command \"%s\"", commandString, this.journalingFile.getAbsolutePath()), e);
            }
            logger.info(String.format("Enabling streaming process journaling file %s", journalingFileName));
        }

        public void writeOutbound(String line) {
            try {
                if (this.journalingFileWriter != null) {
                    this.journalingFileWriter.write("Sending: \n[");
                    this.journalingFileWriter.write(line);
                    this.journalingFileWriter.write("]\n\n");
                    this.journalingFileWriter.flush();
                }
            }
            catch (IOException e) {
                throw new GATKException("Error writing to output to process journal", e);
            }
        }

        public void writeInbound(StreamOutput stdout, StreamOutput stderr) {
            if (this.journalingFileWriter != null) {
                try {
                    if (stdout != null) {
                        this.journalingFileWriter.write("Received from stdout: [");
                        this.journalingFileWriter.write(stdout.getBufferString());
                        this.journalingFileWriter.write("]\n");
                    }
                    if (stderr != null) {
                        this.journalingFileWriter.write("Received from stderr: [");
                        this.journalingFileWriter.write(stderr.getBufferString());
                        this.journalingFileWriter.write("]\n");
                    }
                    this.journalingFileWriter.write("\n");
                    this.journalingFileWriter.flush();
                }
                catch (IOException e) {
                    throw new GATKException(String.format("Error writing to journaling file %s", this.journalingFile.getAbsolutePath()), e);
                }
            }
        }

        public void writeLogMessage(String message) {
            if (this.journalingFileWriter != null) {
                try {
                    this.journalingFileWriter.write(message);
                    this.journalingFileWriter.flush();
                }
                catch (IOException e) {
                    throw new GATKException(String.format("Error writing to journaling file %s", this.journalingFile.getAbsolutePath()), e);
                }
            }
        }

        public void close() {
            try {
                if (this.journalingFileWriter != null) {
                    this.writeLogMessage("Shutting down journal normally");
                    this.journalingFileWriter.flush();
                    this.journalingFileWriter.close();
                    if (this.cleanUpJournal) {
                        this.journalingFile.delete();
                    }
                }
            }
            catch (IOException e) {
                throw new GATKException(String.format("Error closing streaming process journaling file %s", this.journalingFile.getAbsolutePath()), e);
            }
        }
    }
}

