/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.multilang;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.multilang.DrainChildSTDERRTask;
import com.amazonaws.services.kinesis.multilang.MessageReader;
import com.amazonaws.services.kinesis.multilang.MessageWriter;
import com.amazonaws.services.kinesis.multilang.MultiLangProtocol;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class MultiLangRecordProcessor
implements IRecordProcessor {
    private static final Log LOG = LogFactory.getLog(MultiLangRecordProcessor.class);
    private static final int EXIT_VALUE = 1;
    private volatile boolean initialized;
    private String shardId;
    private Future<?> stderrReadTask;
    private MessageWriter messageWriter;
    private MessageReader messageReader;
    private DrainChildSTDERRTask readSTDERRTask;
    private ProcessBuilder processBuilder;
    private Process process;
    private ExecutorService executorService;
    private ProcessState state;
    private ObjectMapper objectMapper;
    private MultiLangProtocol protocol;

    MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper) {
        this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(), new DrainChildSTDERRTask());
    }

    MultiLangRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper, MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask) {
        this.executorService = executorService;
        this.processBuilder = processBuilder;
        this.objectMapper = objectMapper;
        this.messageWriter = messageWriter;
        this.messageReader = messageReader;
        this.readSTDERRTask = readSTDERRTask;
        this.state = ProcessState.ACTIVE;
    }

    @Override
    public void initialize(String shardIdToProcess) {
        try {
            this.shardId = shardIdToProcess;
            try {
                this.process = this.startProcess();
            }
            catch (IOException e) {
                throw new IOException("Failed to start client executable", e);
            }
            this.messageWriter.initialize(this.process.getOutputStream(), this.shardId, this.objectMapper, this.executorService);
            this.messageReader.initialize(this.process.getInputStream(), this.shardId, this.objectMapper, this.executorService);
            this.readSTDERRTask.initialize(this.process.getErrorStream(), this.shardId, "Reading STDERR for " + this.shardId);
            this.stderrReadTask = this.executorService.submit(this.readSTDERRTask);
            this.protocol = new MultiLangProtocol(this.messageReader, this.messageWriter, this.shardId);
            if (!this.protocol.initialize()) {
                throw new RuntimeException("Failed to initialize child process");
            }
            this.initialized = true;
        }
        catch (Throwable t) {
            this.stopProcessing("Encountered an error while trying to initialize record processor", t);
        }
    }

    @Override
    public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
        try {
            if (!this.protocol.processRecords(records, checkpointer)) {
                throw new RuntimeException("Child process failed to process records");
            }
        }
        catch (Throwable t) {
            this.stopProcessing("Encountered an error while trying to process records", t);
        }
    }

    @Override
    public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
        if (!this.initialized) {
            LOG.info((Object)"Record processor was not initialized and will not have a child process, so not invoking child process shutdown.");
            this.state = ProcessState.SHUTDOWN;
            return;
        }
        try {
            if (ProcessState.ACTIVE.equals((Object)this.state)) {
                if (!this.protocol.shutdown(checkpointer, reason)) {
                    throw new RuntimeException("Child process failed to shutdown");
                }
                this.childProcessShutdownSequence();
            } else {
                LOG.warn((Object)"Shutdown was called but this processor is already shutdown. Not doing anything.");
            }
        }
        catch (Throwable t) {
            if (ProcessState.ACTIVE.equals((Object)this.state)) {
                this.stopProcessing("Encountered an error while trying to shutdown child process", t);
            }
            this.stopProcessing("Encountered an error during shutdown, but it appears the processor has already been shutdown", t);
        }
    }

    private void childProcessShutdownSequence() {
        try {
            if (this.messageWriter.isOpen()) {
                this.messageWriter.close();
            }
        }
        catch (IOException e) {
            LOG.error((Object)"Encountered exception while trying to close output stream.", (Throwable)e);
        }
        this.safelyWaitOnFuture(this.messageReader.drainSTDOUT(), "draining STDOUT");
        this.safelyWaitOnFuture(this.stderrReadTask, "draining STDERR");
        this.safelyCloseInputStream(this.process.getErrorStream(), "STDERR");
        this.safelyCloseInputStream(this.process.getInputStream(), "STDOUT");
        try {
            LOG.info((Object)("Child process exited with value: " + this.process.waitFor()));
        }
        catch (InterruptedException e) {
            LOG.error((Object)"Interrupted before process finished exiting. Attempting to kill process.");
            this.process.destroy();
        }
        this.state = ProcessState.SHUTDOWN;
    }

    private void safelyCloseInputStream(InputStream inputStream, String name) {
        try {
            inputStream.close();
        }
        catch (IOException e) {
            LOG.error((Object)("Encountered exception while trying to close " + name + " stream."), (Throwable)e);
        }
    }

    private void safelyWaitOnFuture(Future<?> future, String whatThisFutureIsDoing) {
        try {
            future.get();
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.error((Object)("Encountered error while " + whatThisFutureIsDoing + " for shard " + this.shardId), (Throwable)e);
        }
    }

    private void stopProcessing(String message, Throwable reason) {
        try {
            LOG.error((Object)message, reason);
            if (!this.state.equals((Object)ProcessState.SHUTDOWN)) {
                this.childProcessShutdownSequence();
            }
        }
        catch (Throwable t) {
            LOG.error((Object)"Encountered error while trying to shutdown", t);
        }
        this.exit();
    }

    void exit() {
        System.exit(1);
    }

    Process startProcess() throws IOException {
        return this.processBuilder.start();
    }

    private static enum ProcessState {
        ACTIVE,
        SHUTDOWN;

    }
}

