/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.vertx.kafka.operations;

import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.vertx.kafka.VertxKafkaConfigurationOptionsProxy;
import org.apache.camel.component.vertx.kafka.VertxKafkaHeadersPropagation;
import org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration;
import org.apache.camel.component.vertx.kafka.serde.VertxKafkaTypeSerializer;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class VertxKafkaProducerOperations {
    private static final Logger LOG = LoggerFactory.getLogger(VertxKafkaProducerOperations.class);
    private final KafkaProducer<Object, Object> kafkaProducer;
    private final VertxKafkaConfigurationOptionsProxy configurationOptionsProxy;

    public VertxKafkaProducerOperations(KafkaProducer<Object, Object> kafkaProducer, VertxKafkaConfiguration configuration) {
        ObjectHelper.notNull(kafkaProducer, (String)"kafkaProducer");
        ObjectHelper.notNull((Object)configuration, (String)"configuration");
        this.kafkaProducer = kafkaProducer;
        this.configurationOptionsProxy = new VertxKafkaConfigurationOptionsProxy(configuration);
    }

    public boolean sendEvents(Message inMessage, AsyncCallback callback) {
        return this.sendEvents(inMessage, unused -> LOG.debug("Processed one event..."), callback);
    }

    public boolean sendEvents(Message inMessage, Consumer<List<RecordMetadata>> resultCallback, AsyncCallback callback) {
        this.sendAsyncEvents(inMessage).subscribe(resultCallback, error -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Error processing async exchange with error: {}", (Object)error.getMessage());
            }
            inMessage.getExchange().setException(error);
            callback.done(false);
        }, () -> {
            LOG.trace("All events with exchange have been sent successfully.");
            callback.done(false);
        });
        return false;
    }

    private Mono<List<RecordMetadata>> sendAsyncEvents(Message inMessage) {
        return Flux.fromIterable(this.createKafkaProducerRecords(inMessage)).flatMap(this::sendDataToKafka).collectList().doOnError(error -> LOG.error(error.getMessage()));
    }

    private Mono<RecordMetadata> sendDataToKafka(KafkaProducerRecord<Object, Object> producerRecord) {
        return Mono.create(sink -> this.kafkaProducer.send(producerRecord, asyncResult -> {
            if (asyncResult.failed()) {
                sink.error(asyncResult.cause());
            } else {
                sink.success((Object)((RecordMetadata)asyncResult.result()));
            }
        }));
    }

    private Iterable<KafkaProducerRecord<Object, Object>> createKafkaProducerRecords(Message inMessage) {
        if (inMessage.getBody() instanceof Iterable) {
            return this.createProducerRecordFromIterable((Iterable)inMessage.getBody(), inMessage);
        }
        return Collections.singletonList(this.createProducerRecordFromMessage(inMessage, null));
    }

    private Iterable<KafkaProducerRecord<Object, Object>> createProducerRecordFromIterable(Iterable<Object> inputData, Message message) {
        LinkedList<KafkaProducerRecord<Object, Object>> finalRecords = new LinkedList<KafkaProducerRecord<Object, Object>>();
        String parentTopic = this.getTopic(message, null);
        inputData.forEach(data -> {
            if (data instanceof Exchange) {
                finalRecords.add(this.createProducerRecordFromExchange((Exchange)data, parentTopic));
            } else if (data instanceof Message) {
                finalRecords.add(this.createProducerRecordFromMessage((Message)data, parentTopic));
            } else {
                finalRecords.add(this.createProducerRecordFromObject(data, message, parentTopic));
            }
        });
        return finalRecords;
    }

    private KafkaProducerRecord<Object, Object> createProducerRecordFromExchange(Exchange exchange, String parentTopic) {
        return this.createProducerRecordFromMessage(exchange.getIn(), parentTopic);
    }

    private KafkaProducerRecord<Object, Object> createProducerRecordFromMessage(Message message, String parentTopic) {
        return this.createProducerRecordFromObject(message.getBody(), message, parentTopic);
    }

    private KafkaProducerRecord<Object, Object> createProducerRecordFromObject(Object inputData, Message message, String parentTopic) {
        String topic = this.getTopic(message, parentTopic);
        Object messageKey = this.getMessageKey(message);
        Object messageValue = this.getMessageValue(message, inputData);
        Integer partitionId = this.getPartitionId(message);
        Long overrideTimestamp = this.getOverrideTimestamp(message);
        List<KafkaHeader> propagatedHeaders = new VertxKafkaHeadersPropagation(this.configurationOptionsProxy.getConfiguration().getHeaderFilterStrategy()).getPropagatedHeaders(message);
        return KafkaProducerRecord.create((String)topic, (Object)messageKey, (Object)messageValue, (Long)overrideTimestamp, (Integer)partitionId).addHeaders(propagatedHeaders);
    }

    private String getTopic(Message message, String parentTopic) {
        String innerMessageTopic;
        String innerOverrideTopic = this.configurationOptionsProxy.getOverrideTopic(message);
        String topic = this.getTopic(message, innerOverrideTopic, innerMessageTopic = (String)message.getHeader("CamelVertxKafkaTopic", String.class), parentTopic);
        if (ObjectHelper.isEmpty((Object)topic)) {
            throw new IllegalArgumentException("Topic cannot be empty, provide a topic in the config or in the headers.");
        }
        return topic;
    }

    private String getTopic(Message message, String innerOverrideTopic, String innerTopic, String parentTopic) {
        String firstCheckStep = ObjectHelper.isEmpty((Object)innerOverrideTopic) ? innerTopic : innerOverrideTopic;
        String secondCheckStep = ObjectHelper.isEmpty((Object)firstCheckStep) ? parentTopic : firstCheckStep;
        return ObjectHelper.isEmpty((Object)secondCheckStep) ? this.configurationOptionsProxy.getTopic(message) : secondCheckStep;
    }

    private Object getMessageKey(Message message) {
        return VertxKafkaTypeSerializer.tryConvertToSerializedType(message, this.configurationOptionsProxy.getMessageKey(message), this.configurationOptionsProxy.getKeySerializer(message));
    }

    private Integer getPartitionId(Message message) {
        return this.configurationOptionsProxy.getPartitionId(message);
    }

    private Object getMessageValue(Message message, Object inputData) {
        return VertxKafkaTypeSerializer.tryConvertToSerializedType(message, inputData, this.configurationOptionsProxy.getValueSerializer(message));
    }

    private Long getOverrideTimestamp(Message message) {
        Object timeStamp = this.configurationOptionsProxy.getOverrideTimestamp(message);
        Long overrideTimestamp = null;
        if (ObjectHelper.isNotEmpty((Object)timeStamp)) {
            overrideTimestamp = (long)((Long)timeStamp);
        }
        return overrideTimestamp;
    }
}

