/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.multilang;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.multilang.messages.Message;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage;
import software.amazon.kinesis.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import software.amazon.kinesis.shaded.org.apache.commons.logging.Log;
import software.amazon.kinesis.shaded.org.apache.commons.logging.LogFactory;

class MessageWriter {
    private static final Log LOG = LogFactory.getLog(MessageWriter.class);
    private BufferedWriter writer;
    private volatile boolean open = true;
    private String shardId;
    private ObjectMapper objectMapper;
    private ExecutorService executorService;

    MessageWriter() {
    }

    private Future<Boolean> writeMessageToOutput(final String message) throws IOException {
        Callable<Boolean> writeMessageToOutputTask = new Callable<Boolean>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Boolean call() throws Exception {
                try {
                    BufferedWriter bufferedWriter = MessageWriter.this.writer;
                    synchronized (bufferedWriter) {
                        MessageWriter.this.writer.write(message, 0, message.length());
                        MessageWriter.this.writer.write(System.lineSeparator(), 0, System.lineSeparator().length());
                        MessageWriter.this.writer.flush();
                    }
                    LOG.info("Message size == " + message.getBytes().length + " bytes for shard " + MessageWriter.this.shardId);
                }
                catch (IOException e) {
                    MessageWriter.this.open = false;
                }
                return MessageWriter.this.open;
            }
        };
        if (this.open) {
            return this.executorService.submit(writeMessageToOutputTask);
        }
        String errorMessage = "Cannot write message " + message + " because writer is closed for shard " + this.shardId;
        LOG.info(errorMessage);
        throw new IllegalStateException(errorMessage);
    }

    private Future<Boolean> writeMessage(Message message) {
        LOG.info("Writing " + message.getClass().getSimpleName() + " to child process for shard " + this.shardId);
        try {
            String jsonText = this.objectMapper.writeValueAsString(message);
            return this.writeMessageToOutput(jsonText);
        }
        catch (IOException e) {
            String errorMessage = String.format("Encountered I/O error while writing %s action to subprocess", message.getClass().getSimpleName());
            LOG.error(errorMessage, e);
            throw new RuntimeException(errorMessage, e);
        }
    }

    Future<Boolean> writeInitializeMessage(InitializationInput initializationInput) {
        return this.writeMessage(new InitializeMessage(initializationInput));
    }

    Future<Boolean> writeProcessRecordsMessage(ProcessRecordsInput processRecordsInput) {
        return this.writeMessage(new ProcessRecordsMessage(processRecordsInput));
    }

    Future<Boolean> writeShutdownMessage(ShutdownReason reason) {
        return this.writeMessage(new ShutdownMessage(reason));
    }

    Future<Boolean> writeShutdownRequestedMessage() {
        return this.writeMessage(new ShutdownRequestedMessage());
    }

    Future<Boolean> writeCheckpointMessageWithError(String sequenceNumber, Long subSequenceNumber, Throwable throwable) {
        return this.writeMessage(new CheckpointMessage(sequenceNumber, subSequenceNumber, throwable));
    }

    void close() throws IOException {
        this.open = false;
        this.writer.close();
    }

    boolean isOpen() {
        return this.open;
    }

    MessageWriter initialize(OutputStream stream, String shardId, ObjectMapper objectMapper, ExecutorService executorService) {
        return this.initialize(new BufferedWriter(new OutputStreamWriter(stream)), shardId, objectMapper, executorService);
    }

    MessageWriter initialize(BufferedWriter writer, String shardId, ObjectMapper objectMapper, ExecutorService executorService) {
        this.writer = writer;
        this.shardId = shardId;
        this.objectMapper = objectMapper;
        this.executorService = executorService;
        return this;
    }
}

