/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.apache.kafka.trogdor.workload.ExternalCommandSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalCommandWorker
implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger(ExternalCommandWorker.class);
    private static final int DEFAULT_SHUTDOWN_GRACE_PERIOD_MS = 5000;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final LinkedBlockingQueue<TerminatorAction> terminatorActionQueue = new LinkedBlockingQueue();
    private final LinkedBlockingQueue<Optional<JsonNode>> stdinQueue = new LinkedBlockingQueue();
    private final String id;
    private final ExternalCommandSpec spec;
    private WorkerStatusTracker status;
    private KafkaFutureImpl<String> doneFuture;
    private ExecutorService executor;

    public ExternalCommandWorker(String id, ExternalCommandSpec spec) {
        this.id = id;
        this.spec = spec;
    }

    @Override
    public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> doneFuture) throws Exception {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("ConsumeBenchWorker is already running.");
        }
        log.info("{}: Activating ExternalCommandWorker with {}", (Object)this.id, (Object)this.spec);
        this.status = status;
        this.doneFuture = doneFuture;
        this.executor = Executors.newCachedThreadPool(ThreadUtils.createThreadFactory("ExternalCommandWorkerThread%d", false));
        Process process = null;
        try {
            process = this.startProcess();
        }
        catch (Throwable t) {
            log.error("{}: Unable to start process", (Object)this.id, (Object)t);
            this.executor.shutdown();
            doneFuture.complete("Unable to start process: " + t.getMessage());
            return;
        }
        Future<?> stdoutFuture = this.executor.submit(new StdoutMonitor(process));
        Future<?> stderrFuture = this.executor.submit(new StderrMonitor(process));
        this.executor.submit(new StdinWriter(process));
        Future<?> terminatorFuture = this.executor.submit(new Terminator(process));
        this.executor.submit(new ExitMonitor(process, stdoutFuture, stderrFuture, terminatorFuture));
        ObjectNode startMessage = new ObjectNode(JsonNodeFactory.instance);
        startMessage.set("id", new TextNode(this.id));
        startMessage.set("workload", this.spec.workload());
        this.stdinQueue.add(Optional.of(startMessage));
    }

    private Process startProcess() throws Exception {
        if (this.spec.command().isEmpty()) {
            throw new RuntimeException("No command specified");
        }
        ProcessBuilder bld = new ProcessBuilder(this.spec.command());
        Process process = bld.start();
        return process;
    }

    private static JsonNode readObject(String line) {
        JsonNode resp;
        try {
            resp = JsonUtil.JSON_SERDE.readTree(line);
        }
        catch (IOException e) {
            return NullNode.instance;
        }
        return resp;
    }

    @Override
    public void stop(Platform platform) throws Exception {
        int shutdownGracePeriodMs;
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("ExternalCommandWorker is not running.");
        }
        log.info("{}: Deactivating ExternalCommandWorker.", (Object)this.id);
        this.terminatorActionQueue.add(TerminatorAction.DESTROY);
        int n = shutdownGracePeriodMs = this.spec.shutdownGracePeriodMs().isPresent() ? this.spec.shutdownGracePeriodMs().get() : 5000;
        if (!this.executor.awaitTermination(shutdownGracePeriodMs, TimeUnit.MILLISECONDS)) {
            this.terminatorActionQueue.add(TerminatorAction.DESTROY_FORCIBLY);
            this.executor.awaitTermination(1L, TimeUnit.DAYS);
        }
        this.status = null;
        this.doneFuture = null;
        this.executor = null;
    }

    class Terminator
    implements Runnable {
        private final Process process;

        Terminator(Process process) {
            this.process = process;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    switch ((TerminatorAction)((Object)ExternalCommandWorker.this.terminatorActionQueue.take())) {
                        case DESTROY: {
                            log.info("{}: destroying process", (Object)ExternalCommandWorker.this.id);
                            this.process.getInputStream().close();
                            this.process.getErrorStream().close();
                            this.process.destroy();
                            break;
                        }
                        case DESTROY_FORCIBLY: {
                            log.info("{}: forcibly destroying process", (Object)ExternalCommandWorker.this.id);
                            this.process.getInputStream().close();
                            this.process.getErrorStream().close();
                            this.process.destroyForcibly();
                            break;
                        }
                        case CLOSE: {
                            log.trace("{}: closing Terminator thread.", (Object)ExternalCommandWorker.this.id);
                            return;
                        }
                    }
                }
            }
            catch (Throwable e) {
                log.error("{}: Terminator error", (Object)ExternalCommandWorker.this.id, (Object)e);
                ExternalCommandWorker.this.doneFuture.complete("Terminator error: " + e.getMessage());
                return;
            }
        }
    }

    class ExitMonitor
    implements Runnable {
        private final Process process;
        private final Future<?> stdoutFuture;
        private final Future<?> stderrFuture;
        private final Future<?> terminatorFuture;

        ExitMonitor(Process process, Future<?> stdoutFuture, Future<?> stderrFuture, Future<?> terminatorFuture) {
            this.process = process;
            this.stdoutFuture = stdoutFuture;
            this.stderrFuture = stderrFuture;
            this.terminatorFuture = terminatorFuture;
        }

        @Override
        public void run() {
            try {
                int exitStatus = this.process.waitFor();
                log.info("{}: process exited with return code {}", (Object)ExternalCommandWorker.this.id, (Object)exitStatus);
                this.stdoutFuture.get();
                this.stderrFuture.get();
                if (exitStatus == 0) {
                    ExternalCommandWorker.this.doneFuture.complete("");
                } else {
                    ExternalCommandWorker.this.doneFuture.complete("exited with return code " + exitStatus);
                }
                ExternalCommandWorker.this.stdinQueue.add(Optional.empty());
                ExternalCommandWorker.this.terminatorActionQueue.add(TerminatorAction.CLOSE);
                this.terminatorFuture.get();
                ExternalCommandWorker.this.executor.shutdown();
            }
            catch (Throwable e) {
                log.error("{}: ExitMonitor error", (Object)ExternalCommandWorker.this.id, (Object)e);
                ExternalCommandWorker.this.doneFuture.complete("ExitMonitor error: " + e.getMessage());
            }
        }
    }

    class StdinWriter
    implements Runnable {
        private final Process process;

        StdinWriter(Process process) {
            this.process = process;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            OutputStreamWriter stdinWriter = new OutputStreamWriter(this.process.getOutputStream(), StandardCharsets.UTF_8);
            try {
                while (true) {
                    log.info("{}: stdin writer ready.", (Object)ExternalCommandWorker.this.id);
                    Optional node = (Optional)ExternalCommandWorker.this.stdinQueue.take();
                    if (!node.isPresent()) {
                        log.trace("{}: StdinWriter terminating.", (Object)ExternalCommandWorker.this.id);
                        return;
                    }
                    String inputString = JsonUtil.toJsonString(node.get());
                    log.info("{}: writing to stdin: {}", (Object)ExternalCommandWorker.this.id, (Object)inputString);
                    stdinWriter.write(inputString + "\n");
                    stdinWriter.flush();
                    continue;
                    break;
                }
            }
            catch (IOException e) {
                log.info("{}: can't write any more to stdin: {}", (Object)ExternalCommandWorker.this.id, (Object)e.getMessage());
                return;
            }
            catch (Throwable e) {
                log.info("{}: error writing to stdin.", (Object)ExternalCommandWorker.this.id, (Object)e);
                return;
            }
            finally {
                try {
                    stdinWriter.close();
                }
                catch (IOException e) {
                    log.debug("{}: error closing stdinWriter: {}", (Object)ExternalCommandWorker.this.id, (Object)e.getMessage());
                }
            }
        }
    }

    class StderrMonitor
    implements Runnable {
        private final Process process;

        StderrMonitor(Process process) {
            this.process = process;
        }

        /*
         * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            log.trace("{}: starting stderr monitor.", (Object)ExternalCommandWorker.this.id);
            try {
                BufferedReader br = new BufferedReader(new InputStreamReader(this.process.getErrorStream(), StandardCharsets.UTF_8));
                Throwable throwable = null;
                while (true) {
                    String line;
                    try {
                        try {
                            line = br.readLine();
                            if (line == null) {
                                throw new IOException("EOF");
                            }
                        }
                        catch (IOException e) {
                            log.info("{}: can't read any more from stderr: {}", (Object)ExternalCommandWorker.this.id, (Object)e.getMessage());
                            if (br == null) return;
                            if (throwable == null) {
                                br.close();
                                return;
                            }
                            try {
                                br.close();
                                return;
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                                return;
                            }
                        }
                    }
                    catch (Throwable throwable3) {
                        throwable = throwable3;
                        throw throwable3;
                    }
                    catch (Throwable throwable4) {
                        if (br == null) throw throwable4;
                        if (throwable == null) {
                            br.close();
                            throw throwable4;
                        }
                        try {
                            br.close();
                            throw throwable4;
                        }
                        catch (Throwable throwable5) {
                            throwable.addSuppressed(throwable5);
                            throw throwable4;
                        }
                    }
                    {
                        log.error("{}: (stderr):{}", (Object)ExternalCommandWorker.this.id, (Object)line);
                        continue;
                    }
                    break;
                }
            }
            catch (Throwable e) {
                log.info("{}: error reading from stderr.", (Object)ExternalCommandWorker.this.id, (Object)e);
                return;
            }
        }
    }

    class StdoutMonitor
    implements Runnable {
        private final Process process;

        StdoutMonitor(Process process) {
            this.process = process;
        }

        /*
         * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            log.trace("{}: starting stdout monitor.", (Object)ExternalCommandWorker.this.id);
            try {
                BufferedReader br = new BufferedReader(new InputStreamReader(this.process.getInputStream(), StandardCharsets.UTF_8));
                Throwable throwable = null;
                while (true) {
                    String line;
                    try {
                        try {
                            line = br.readLine();
                            if (line == null) {
                                throw new IOException("EOF");
                            }
                        }
                        catch (IOException e) {
                            log.info("{}: can't read any more from stdout: {}", (Object)ExternalCommandWorker.this.id, (Object)e.getMessage());
                            if (br == null) return;
                            if (throwable == null) {
                                br.close();
                                return;
                            }
                            try {
                                br.close();
                                return;
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                                return;
                            }
                        }
                    }
                    catch (Throwable throwable3) {
                        throwable = throwable3;
                        throw throwable3;
                    }
                    catch (Throwable throwable4) {
                        if (br == null) throw throwable4;
                        if (throwable == null) {
                            br.close();
                            throw throwable4;
                        }
                        try {
                            br.close();
                            throw throwable4;
                        }
                        catch (Throwable throwable5) {
                            throwable.addSuppressed(throwable5);
                            throw throwable4;
                        }
                    }
                    {
                        log.trace("{}: read line from stdin: {}", (Object)ExternalCommandWorker.this.id, (Object)line);
                        JsonNode resp = ExternalCommandWorker.readObject(line);
                        if (resp.has("status")) {
                            log.info("{}: New status: {}", (Object)ExternalCommandWorker.this.id, (Object)resp.get("status").toString());
                            ExternalCommandWorker.this.status.update(resp.get("status"));
                        }
                        if (resp.has("log")) {
                            log.info("{}: (stdout): {}", (Object)ExternalCommandWorker.this.id, (Object)resp.get("log").asText());
                        }
                        if (!resp.has("error")) continue;
                        String error = resp.get("error").asText();
                        log.error("{}: error: {}", (Object)ExternalCommandWorker.this.id, (Object)error);
                        ExternalCommandWorker.this.doneFuture.complete(error);
                        continue;
                    }
                    break;
                }
            }
            catch (Throwable e) {
                log.info("{}: error reading from stdout.", (Object)ExternalCommandWorker.this.id, (Object)e);
                return;
            }
        }
    }

    static enum TerminatorAction {
        DESTROY,
        DESTROY_FORCIBLY,
        CLOSE;

    }
}

