/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.kafka.eventhandling;

import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import java.util.function.BiFunction;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.axonframework.common.Assert;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.eventsourcing.GenericDomainEventMessage;
import org.axonframework.kafka.eventhandling.HeaderUtils;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.messaging.MetaData;
import org.axonframework.serialization.LazyDeserializingObject;
import org.axonframework.serialization.SerializedMessage;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.SimpleSerializedObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultKafkaMessageConverter
implements KafkaMessageConverter<String, byte[]> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageConverter.class);
    private final Serializer serializer;
    private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
    private final BiFunction<String, Object, RecordHeader> headerValueMapper;

    public DefaultKafkaMessageConverter(Serializer serializer) {
        this(serializer, (SequencingPolicy<? super EventMessage<?>>)SequentialPerAggregatePolicy.instance(), HeaderUtils.byteMapper());
    }

    public DefaultKafkaMessageConverter(Serializer serializer, SequencingPolicy<? super EventMessage<?>> sequencingPolicy, BiFunction<String, Object, RecordHeader> headerValueMapper) {
        Assert.notNull((Object)serializer, () -> "Serializer may not be null");
        Assert.notNull(sequencingPolicy, () -> "SequencingPolicy may not be null");
        Assert.notNull(headerValueMapper, () -> "HeaderValueMapper may not be null");
        this.serializer = serializer;
        this.sequencingPolicy = sequencingPolicy;
        this.headerValueMapper = headerValueMapper;
    }

    @Override
    public ProducerRecord<String, byte[]> createKafkaMessage(EventMessage<?> eventMessage, String topic) {
        SerializedObject serializedObject = eventMessage.serializePayload(this.serializer, byte[].class);
        byte[] payload = (byte[])serializedObject.getData();
        return new ProducerRecord(topic, null, null, (Object)this.key(eventMessage), (Object)payload, (Iterable)HeaderUtils.toHeaders(eventMessage, (SerializedObject<byte[]>)serializedObject, this.headerValueMapper));
    }

    private String key(EventMessage<?> eventMessage) {
        Object identifier = this.sequencingPolicy.getSequenceIdentifierFor(eventMessage);
        return identifier != null ? identifier.toString() : null;
    }

    @Override
    public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, byte[]> consumerRecord) {
        try {
            Headers headers = consumerRecord.headers();
            if (this.isAxonMessage(headers)) {
                byte[] messageBody = (byte[])consumerRecord.value();
                SerializedMessage<?> message = this.extractSerializedMessage(headers, messageBody);
                return this.buildMessage(headers, message);
            }
        }
        catch (Exception e) {
            logger.trace("Error converting {} to axon", consumerRecord, (Object)e);
        }
        return Optional.empty();
    }

    private Optional<EventMessage<?>> buildMessage(Headers headers, SerializedMessage<?> message) {
        long timestamp = HeaderUtils.valueAsLong(headers, "axon-message-timestamp");
        return headers.lastHeader("axon-message-aggregate-id") != null ? this.domainEvent(headers, message, timestamp) : this.event(message, timestamp);
    }

    private SerializedMessage<?> extractSerializedMessage(Headers headers, byte[] messageBody) {
        SimpleSerializedObject serializedObject = new SimpleSerializedObject((Object)messageBody, byte[].class, HeaderUtils.valueAsString(headers, "axon-message-type"), HeaderUtils.valueAsString(headers, "axon-message-revision", null));
        return new SerializedMessage(HeaderUtils.valueAsString(headers, "axon-message-id"), new LazyDeserializingObject((SerializedObject)serializedObject, this.serializer), new LazyDeserializingObject((Object)MetaData.from(HeaderUtils.extractAxonMetadata(headers))));
    }

    private boolean isAxonMessage(Headers headers) {
        return HeaderUtils.keys(headers).containsAll(Arrays.asList("axon-message-id", "axon-message-type"));
    }

    private Optional<EventMessage<?>> domainEvent(Headers headers, SerializedMessage<?> message, long timestamp) {
        return Optional.of(new GenericDomainEventMessage(HeaderUtils.valueAsString(headers, "axon-message-aggregate-type"), HeaderUtils.valueAsString(headers, "axon-message-aggregate-id"), HeaderUtils.valueAsLong(headers, "axon-message-aggregate-seq").longValue(), message, () -> Instant.ofEpochMilli(timestamp)));
    }

    private Optional<EventMessage<?>> event(SerializedMessage<?> message, long timestamp) {
        return Optional.of(new GenericEventMessage(message, () -> Instant.ofEpochMilli(timestamp)));
    }
}

