/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.contribs.tasks.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.netflix.conductor.contribs.tasks.kafka.KafkaProducerManager;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.utils.Utils;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component(value="KAFKA_PUBLISH")
public class KafkaPublishTask
extends WorkflowSystemTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPublishTask.class);
    static final String REQUEST_PARAMETER_NAME = "kafka_request";
    private static final String MISSING_REQUEST = "Missing Kafka request. Task input MUST have a 'kafka_request' key with KafkaTask.Input as value. See documentation for KafkaTask for required input parameters";
    private static final String MISSING_BOOT_STRAP_SERVERS = "No boot strap servers specified";
    private static final String MISSING_KAFKA_TOPIC = "Missing Kafka topic. See documentation for KafkaTask for required input parameters";
    private static final String MISSING_KAFKA_VALUE = "Missing Kafka value.  See documentation for KafkaTask for required input parameters";
    private static final String FAILED_TO_INVOKE = "Failed to invoke kafka task due to: ";
    private final ObjectMapper objectMapper;
    private final String requestParameter;
    private final KafkaProducerManager producerManager;

    @Autowired
    public KafkaPublishTask(KafkaProducerManager clientManager, ObjectMapper objectMapper) {
        super("KAFKA_PUBLISH");
        this.requestParameter = REQUEST_PARAMETER_NAME;
        this.producerManager = clientManager;
        this.objectMapper = objectMapper;
        LOGGER.info("KafkaTask initialized.");
    }

    public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
        long taskStartMillis = Instant.now().toEpochMilli();
        task.setWorkerId(Utils.getServerId());
        Object request = task.getInputData().get(this.requestParameter);
        if (Objects.isNull(request)) {
            this.markTaskAsFailed(task, MISSING_REQUEST);
            return;
        }
        Input input = (Input)this.objectMapper.convertValue(request, Input.class);
        if (StringUtils.isBlank((CharSequence)input.getBootStrapServers())) {
            this.markTaskAsFailed(task, MISSING_BOOT_STRAP_SERVERS);
            return;
        }
        if (StringUtils.isBlank((CharSequence)input.getTopic())) {
            this.markTaskAsFailed(task, MISSING_KAFKA_TOPIC);
            return;
        }
        if (Objects.isNull(input.getValue())) {
            this.markTaskAsFailed(task, MISSING_KAFKA_VALUE);
            return;
        }
        try {
            Future<RecordMetadata> recordMetaDataFuture = this.kafkaPublish(input);
            try {
                recordMetaDataFuture.get();
                if (this.isAsyncComplete(task)) {
                    task.setStatus(TaskModel.Status.IN_PROGRESS);
                } else {
                    task.setStatus(TaskModel.Status.COMPLETED);
                }
                long timeTakenToCompleteTask = Instant.now().toEpochMilli() - taskStartMillis;
                LOGGER.debug("Published message {}, Time taken {}", (Object)input, (Object)timeTakenToCompleteTask);
            }
            catch (ExecutionException ec) {
                LOGGER.error("Failed to invoke kafka task: {} - execution exception ", (Object)task.getTaskId(), (Object)ec);
                this.markTaskAsFailed(task, FAILED_TO_INVOKE + ec.getMessage());
            }
        }
        catch (Exception e) {
            LOGGER.error("Failed to invoke kafka task:{} for input {} - unknown exception", new Object[]{task.getTaskId(), input, e});
            this.markTaskAsFailed(task, FAILED_TO_INVOKE + e.getMessage());
        }
    }

    private void markTaskAsFailed(TaskModel task, String reasonForIncompletion) {
        task.setReasonForIncompletion(reasonForIncompletion);
        task.setStatus(TaskModel.Status.FAILED);
    }

    private Future<RecordMetadata> kafkaPublish(Input input) throws Exception {
        long startPublishingEpochMillis = Instant.now().toEpochMilli();
        Producer producer = this.producerManager.getProducer(input);
        long timeTakenToCreateProducer = Instant.now().toEpochMilli() - startPublishingEpochMillis;
        LOGGER.debug("Time taken getting producer {}", (Object)timeTakenToCreateProducer);
        Object key = this.getKey(input);
        Iterable headers = input.getHeaders().entrySet().stream().map(header -> new RecordHeader((String)header.getKey(), String.valueOf(header.getValue()).getBytes())).collect(Collectors.toList());
        ProducerRecord rec = new ProducerRecord(input.getTopic(), null, null, key, (Object)this.objectMapper.writeValueAsString(input.getValue()), headers);
        Future send = producer.send(rec);
        long timeTakenToPublish = Instant.now().toEpochMilli() - startPublishingEpochMillis;
        LOGGER.debug("Time taken publishing {}", (Object)timeTakenToPublish);
        return send;
    }

    @VisibleForTesting
    Object getKey(Input input) {
        String keySerializer = input.getKeySerializer();
        if (LongSerializer.class.getCanonicalName().equals(keySerializer)) {
            return Long.parseLong(String.valueOf(input.getKey()));
        }
        if (IntegerSerializer.class.getCanonicalName().equals(keySerializer)) {
            return Integer.parseInt(String.valueOf(input.getKey()));
        }
        return String.valueOf(input.getKey());
    }

    public boolean execute(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
        return false;
    }

    public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor executor) {
        task.setStatus(TaskModel.Status.CANCELED);
    }

    public boolean isAsync() {
        return true;
    }

    public static class Input {
        public static final String STRING_SERIALIZER = StringSerializer.class.getCanonicalName();
        private Map<String, Object> headers = new HashMap<String, Object>();
        private String bootStrapServers;
        private Object key;
        private Object value;
        private Integer requestTimeoutMs;
        private Integer maxBlockMs;
        private String topic;
        private String keySerializer = STRING_SERIALIZER;

        public Map<String, Object> getHeaders() {
            return this.headers;
        }

        public void setHeaders(Map<String, Object> headers) {
            this.headers = headers;
        }

        public String getBootStrapServers() {
            return this.bootStrapServers;
        }

        public void setBootStrapServers(String bootStrapServers) {
            this.bootStrapServers = bootStrapServers;
        }

        public Object getKey() {
            return this.key;
        }

        public void setKey(Object key) {
            this.key = key;
        }

        public Object getValue() {
            return this.value;
        }

        public void setValue(Object value) {
            this.value = value;
        }

        public Integer getRequestTimeoutMs() {
            return this.requestTimeoutMs;
        }

        public void setRequestTimeoutMs(Integer requestTimeoutMs) {
            this.requestTimeoutMs = requestTimeoutMs;
        }

        public String getTopic() {
            return this.topic;
        }

        public void setTopic(String topic) {
            this.topic = topic;
        }

        public String getKeySerializer() {
            return this.keySerializer;
        }

        public void setKeySerializer(String keySerializer) {
            this.keySerializer = keySerializer;
        }

        public Integer getMaxBlockMs() {
            return this.maxBlockMs;
        }

        public void setMaxBlockMs(Integer maxBlockMs) {
            this.maxBlockMs = maxBlockMs;
        }

        public String toString() {
            return "Input{headers=" + this.headers + ", bootStrapServers='" + this.bootStrapServers + "', key=" + this.key + ", value=" + this.value + ", requestTimeoutMs=" + this.requestTimeoutMs + ", maxBlockMs=" + this.maxBlockMs + ", topic='" + this.topic + "', keySerializer='" + this.keySerializer + "'}";
        }
    }
}

