/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.streams.messaging;

import java.util.ArrayList;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.springframework.kafka.streams.messaging.MessagingFunction;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class MessagingTransformer<K, V, R>
implements Transformer<K, V, KeyValue<K, R>> {
    private final MessagingFunction function;
    private final MessagingMessageConverter converter;
    private ProcessorContext processorContext;

    public MessagingTransformer(MessagingFunction function, MessagingMessageConverter converter) {
        Assert.notNull((Object)function, (String)"'function' cannot be null");
        Assert.notNull((Object)converter, (String)"'converter' cannot be null");
        this.function = function;
        this.converter = converter;
    }

    public void init(ProcessorContext context) {
        this.processorContext = context;
    }

    public KeyValue<K, R> transform(K key, V value) {
        Headers headers = this.processorContext.headers();
        ConsumerRecord record = new ConsumerRecord(this.processorContext.topic(), this.processorContext.partition(), this.processorContext.offset(), this.processorContext.timestamp(), TimestampType.NO_TIMESTAMP_TYPE, null, 0, 0, key, value, headers);
        Message<?> message = this.converter.toMessage(record, null, null, null);
        message = this.function.exchange(message);
        ArrayList headerList = new ArrayList();
        headers.forEach(header -> headerList.add(header.key()));
        headerList.forEach(name -> headers.remove(name));
        ProducerRecord<?, ?> fromMessage = this.converter.fromMessage(message, "dummy");
        fromMessage.headers().forEach(header -> {
            if (!header.key().equals("kafka_topic")) {
                headers.add(header);
            }
        });
        Object key2 = message.getHeaders().get((Object)"kafka_messageKey");
        return new KeyValue(key2 == null ? key : key2, message.getPayload());
    }

    public void close() {
    }
}

