/*
 * Decompiled with CFR 0.152.
 */
package org.broadinstitute.hellbender.utils.python;

import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.python.PythonExecutorBase;
import org.broadinstitute.hellbender.utils.python.PythonScriptExecutor;
import org.broadinstitute.hellbender.utils.python.PythonScriptExecutorException;
import org.broadinstitute.hellbender.utils.runtime.AsynchronousStreamWriter;
import org.broadinstitute.hellbender.utils.runtime.InputStreamSettings;
import org.broadinstitute.hellbender.utils.runtime.OutputStreamSettings;
import org.broadinstitute.hellbender.utils.runtime.ProcessControllerAckResult;
import org.broadinstitute.hellbender.utils.runtime.ProcessOutput;
import org.broadinstitute.hellbender.utils.runtime.ProcessSettings;
import org.broadinstitute.hellbender.utils.runtime.StreamingProcessController;

public class StreamingPythonScriptExecutor<T>
extends PythonExecutorBase {
    private static final Logger logger = LogManager.getLogger(StreamingPythonScriptExecutor.class);
    private static final String NL = System.lineSeparator();
    private final List<String> curatedCommandLineArgs = new ArrayList<String>();
    private StreamingProcessController spController;
    private ProcessSettings processSettings;
    private File dataTransferFIFOFile;
    private FileOutputStream dataTransferFIFOWriter;
    private AsynchronousStreamWriter<T> asyncWriter;
    private File profileResults;
    private static final String PYTHON_IMPORT_GATK = "from gatktool import tool" + NL;
    private static final String PYTHON_INITIALIZE_GATK = "tool.initializeGATK('%s')" + NL;
    private static final String PYTHON_START_PROFILING = "tool.startProfiling()" + NL;
    private static final String PYTHON_TERMINATE_GATK = "tool.terminateGATK()" + NL;
    private static final String PYTHON_INITIALIZE_DATA_FIFO = "tool.initializeDataFIFO('%s')" + NL;
    private static final String PYTHON_CLOSE_DATA_FIFO = "tool.closeDataFIFO()" + NL;
    private static final String PYTHON_SEND_ACK_REQUEST = "tool.sendAck()" + NL;
    private static final String PYTHON_END_PROFILING = "tool.endProfiling('%s')" + NL;
    private boolean isAckRequestOutstanding = false;

    public StreamingPythonScriptExecutor(boolean ensureExecutableExists) {
        this(PythonExecutorBase.PythonExecutableName.PYTHON, ensureExecutableExists);
    }

    public StreamingPythonScriptExecutor(PythonExecutorBase.PythonExecutableName pythonExecutableName, boolean ensureExecutableExists) {
        super(pythonExecutableName, ensureExecutableExists);
    }

    public boolean start(List<String> pythonProcessArgs) {
        return this.start(pythonProcessArgs, false, null);
    }

    public boolean start(List<String> pythonProcessArgs, boolean enableJournaling, File profileResults) {
        PythonScriptExecutor.checkPythonEnvironmentForPackage("gatktool");
        this.profileResults = profileResults;
        ArrayList<String> args = new ArrayList<String>();
        args.add(this.externalScriptExecutableName);
        args.add("-u");
        args.add("-i");
        if (pythonProcessArgs != null) {
            args.addAll(pythonProcessArgs);
        }
        this.curatedCommandLineArgs.addAll(args);
        InputStreamSettings isSettings = new InputStreamSettings();
        OutputStreamSettings stdOutSettings = new OutputStreamSettings();
        stdOutSettings.setBufferSize(-1);
        OutputStreamSettings stdErrSettings = new OutputStreamSettings();
        stdErrSettings.setBufferSize(-1);
        this.processSettings = new ProcessSettings(args.toArray(new String[args.size()]), false, null, null, isSettings, stdOutSettings, stdErrSettings);
        this.spController = new StreamingProcessController(this.processSettings, enableJournaling);
        File ackFIFOFile = this.spController.start();
        if (ackFIFOFile == null) {
            return false;
        }
        this.initializeTool(ackFIFOFile);
        return true;
    }

    public ProcessOutput sendSynchronousCommand(String line) {
        if (!line.endsWith(NL)) {
            throw new IllegalArgumentException("Python commands must be newline-terminated in order to be executed. Indented Python code blocks must be terminated with additional newlines");
        }
        this.spController.writeProcessInput(line);
        this.sendAckRequest();
        return this.waitForAck();
    }

    public void sendAsynchronousCommand(String line) {
        if (!line.endsWith(NL)) {
            throw new IllegalArgumentException("Python commands must be newline-terminated");
        }
        this.spController.writeProcessInput(line);
        this.sendAckRequest();
    }

    public ProcessOutput waitForAck() {
        if (!this.isAckRequestOutstanding) {
            throw new GATKException("No ack request is outstanding. An ack request must be issued first");
        }
        ProcessControllerAckResult pcAckResult = this.spController.waitForAck();
        this.isAckRequestOutstanding = false;
        ProcessOutput po = this.getAccumulatedOutput();
        if (!pcAckResult.isPositiveAck()) {
            throw new PythonScriptExecutorException(String.format("A nack was received from the Python process (most likely caused by a raised exception caused by): %s", pcAckResult.getDisplayMessage()));
        }
        return po;
    }

    @Override
    public String getApproximateCommandLine() {
        return this.curatedCommandLineArgs.stream().collect(Collectors.joining(" "));
    }

    public void initStreamWriter(Function<T, ByteArrayOutputStream> itemSerializer) {
        Utils.nonNull(itemSerializer, "An item serializer must be provided for the async writer service");
        this.dataTransferFIFOFile = this.spController.createDataFIFO();
        this.sendAsynchronousCommand(String.format(PYTHON_INITIALIZE_DATA_FIFO, this.dataTransferFIFOFile.getAbsolutePath()));
        try {
            this.dataTransferFIFOWriter = new FileOutputStream(this.dataTransferFIFOFile);
            this.asyncWriter = this.spController.getAsynchronousStreamWriter(this.dataTransferFIFOWriter, itemSerializer);
            this.waitForAck();
        }
        catch (IOException e) {
            throw new GATKException("Failure opening FIFO for writing", e);
        }
    }

    public void startBatchWrite(String pythonCommand, List<T> batchList) {
        Utils.nonNull(pythonCommand);
        Utils.nonNull(batchList);
        Utils.nonEmpty(batchList);
        this.sendAsynchronousCommand(pythonCommand);
        this.asyncWriter.startBatchWrite(batchList);
    }

    public Future<Integer> waitForPreviousBatchCompletion() {
        Future<Integer> numberOfItemsWritten = this.asyncWriter.waitForPreviousBatchCompletion();
        if (numberOfItemsWritten != null) {
            this.waitForAck();
        }
        return numberOfItemsWritten;
    }

    @VisibleForTesting
    protected Process getProcess() {
        return this.spController.getProcess();
    }

    public void terminate() {
        if (this.profileResults != null) {
            this.spController.writeProcessInput(String.format(PYTHON_END_PROFILING, this.profileResults.getAbsolutePath()));
            this.sendAckRequest();
            this.waitForAck();
        }
        if (this.dataTransferFIFOWriter != null) {
            if (this.asyncWriter != null && !this.asyncWriter.terminate()) {
                throw new GATKException("failed to close asyncWriter");
            }
            this.spController.writeProcessInput(PYTHON_CLOSE_DATA_FIFO);
            this.sendAckRequest();
            this.waitForAck();
            try {
                this.dataTransferFIFOWriter.close();
                this.dataTransferFIFOWriter = null;
                this.dataTransferFIFOFile = null;
            }
            catch (IOException e) {
                throw new GATKException("IOException closing fifo", e);
            }
        }
        this.spController.writeProcessInput(PYTHON_TERMINATE_GATK);
        this.spController.terminate();
    }

    public ProcessOutput getAccumulatedOutput() {
        return this.spController.getProcessOutput();
    }

    private void initializeTool(File ackFIFOFile) {
        this.spController.writeProcessInput(PYTHON_IMPORT_GATK);
        this.spController.writeProcessInput(String.format(PYTHON_INITIALIZE_GATK, ackFIFOFile.getAbsolutePath()));
        this.sendAckRequest();
        this.spController.openAckFIFOForRead();
        this.waitForAck();
        if (this.profileResults != null) {
            this.spController.writeProcessInput(PYTHON_START_PROFILING);
            this.sendAckRequest();
            this.waitForAck();
        }
    }

    private void sendAckRequest() {
        if (this.isAckRequestOutstanding) {
            throw new GATKException("An ack request is already outstanding. The previous ack request must be retrieved before a new ack request can be issued");
        }
        this.spController.writeProcessInput(PYTHON_SEND_ACK_REQUEST);
        this.isAckRequestOutstanding = true;
    }
}

