/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.springframework.cloud.stream.binder.kafka.streams.DeserializationExceptionHandler;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBindingInformationCatalogue;
import org.springframework.cloud.stream.binder.kafka.streams.SendToDlqAndContinue;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import tools.jackson.databind.ObjectMapper;

public class KafkaStreamsMessageConversionDelegate {
    private static final Log LOG = LogFactory.getLog(KafkaStreamsMessageConversionDelegate.class);
    private static final ThreadLocal<KeyValue<Object, Object>> keyValueThreadLocal = new ThreadLocal();
    private final CompositeMessageConverter compositeMessageConverter;
    private final SendToDlqAndContinue sendToDlqAndContinue;
    private final KafkaStreamsBindingInformationCatalogue kstreamBindingInformationCatalogue;
    private final KafkaStreamsBinderConfigurationProperties kstreamBinderConfigurationProperties;
    Exception[] failedWithDeserException = new Exception[1];

    KafkaStreamsMessageConversionDelegate(CompositeMessageConverter compositeMessageConverter, SendToDlqAndContinue sendToDlqAndContinue, KafkaStreamsBindingInformationCatalogue kstreamBindingInformationCatalogue, KafkaStreamsBinderConfigurationProperties kstreamBinderConfigurationProperties) {
        this.compositeMessageConverter = compositeMessageConverter;
        this.sendToDlqAndContinue = sendToDlqAndContinue;
        this.kstreamBindingInformationCatalogue = kstreamBindingInformationCatalogue;
        this.kstreamBinderConfigurationProperties = kstreamBinderConfigurationProperties;
    }

    public KStream serializeOnOutbound(KStream<?, ?> outboundBindTarget) {
        String contentType = this.kstreamBindingInformationCatalogue.getContentType(outboundBindTarget);
        CompositeMessageConverter messageConverter = this.compositeMessageConverter;
        final ThreadLocal<PerRecordContentTypeHolder> perRecordContentTypeHolderThreadLocal = ThreadLocal.withInitial(PerRecordContentTypeHolder::new);
        KStream kStreamWithEnrichedHeaders = outboundBindTarget.filter((k, v) -> v != null).mapValues(arg_0 -> KafkaStreamsMessageConversionDelegate.lambda$serializeOnOutbound$1(contentType, (MessageConverter)messageConverter, perRecordContentTypeHolderThreadLocal, arg_0));
        kStreamWithEnrichedHeaders.process(() -> new Processor(){
            ProcessorContext context;
            final /* synthetic */ KafkaStreamsMessageConversionDelegate this$0;
            {
                this.this$0 = this$0;
            }

            public void init(ProcessorContext context) {
                this.context = context;
            }

            public void process(Record record) {
                if (((PerRecordContentTypeHolder)perRecordContentTypeHolderThreadLocal.get()).contentType != null) {
                    block3: {
                        record.headers().remove("contentType");
                        try {
                            RecordHeader header = new RecordHeader("contentType", new ObjectMapper().writeValueAsBytes((Object)((PerRecordContentTypeHolder)perRecordContentTypeHolderThreadLocal.get()).contentType));
                            record.headers().add((Header)header);
                        }
                        catch (Exception e) {
                            if (!LOG.isDebugEnabled()) break block3;
                            LOG.debug((Object)"Could not add content type header");
                        }
                    }
                    ((PerRecordContentTypeHolder)perRecordContentTypeHolderThreadLocal.get()).unsetContentType();
                }
            }

            public void close() {
            }
        }, new String[0]);
        return kStreamWithEnrichedHeaders;
    }

    public KStream deserializeOnInbound(Class<?> valueClass, KStream<?, ?> bindingTarget) {
        CompositeMessageConverter messageConverter = this.compositeMessageConverter;
        PerRecordContentTypeHolder perRecordContentTypeHolder = new PerRecordContentTypeHolder();
        this.resolvePerRecordContentType(bindingTarget, perRecordContentTypeHolder);
        Map branchGraph = bindingTarget.split().branch((arg_0, arg_1) -> this.lambda$deserializeOnInbound$0(perRecordContentTypeHolder, valueClass, (MessageConverter)messageConverter, arg_0, arg_1)).branch((k, v) -> true).noDefaultBranch();
        KStream[] branch = branchGraph.values().toArray(new KStream[0]);
        this.processErrorFromDeserialization(bindingTarget, branch[1], this.failedWithDeserException);
        return branch[0].mapValues(o2 -> {
            Object objectValue = KafkaStreamsMessageConversionDelegate.keyValueThreadLocal.get().value;
            keyValueThreadLocal.remove();
            return objectValue;
        });
    }

    private void resolvePerRecordContentType(KStream<?, ?> outboundBindTarget, final PerRecordContentTypeHolder perRecordContentTypeHolder) {
        outboundBindTarget.process(() -> new Processor(){
            ProcessorContext context;
            final /* synthetic */ KafkaStreamsMessageConversionDelegate this$0;
            {
                this.this$0 = this$0;
            }

            public void init(ProcessorContext context) {
                this.context = context;
            }

            public void process(Record record) {
                Iterable contentTypes = record.headers().headers("contentType");
                if (contentTypes != null && contentTypes.iterator().hasNext()) {
                    String contentType = new String(((Header)contentTypes.iterator().next()).value());
                    String cleanContentType = StringUtils.replace((String)contentType, (String)"\"", (String)"");
                    perRecordContentTypeHolder.setContentType(cleanContentType);
                }
            }

            public void close() {
            }
        }, new String[0]);
    }

    private void convertAndSetMessage(Object o, Class<?> valueClass, MessageConverter messageConverter, Message<?> msg) {
        Object result = valueClass.isAssignableFrom(msg.getPayload().getClass()) ? msg.getPayload() : messageConverter.fromMessage(msg, valueClass);
        Assert.notNull((Object)result, (String)("Failed to convert message " + String.valueOf(msg)));
        keyValueThreadLocal.set((KeyValue<Object, Object>)new KeyValue(o, result));
    }

    private void processErrorFromDeserialization(final KStream<?, ?> bindingTarget, KStream<?, ?> branch, final Exception[] exception) {
        branch.process(() -> new Processor(){
            ProcessorContext context;
            final /* synthetic */ KafkaStreamsMessageConversionDelegate this$0;
            {
                this.this$0 = this$0;
            }

            public void init(ProcessorContext context) {
                this.context = context;
            }

            public void process(Record record) {
                Object o = record.key();
                Object o2 = record.value();
                if (o2 != null) {
                    if (this.this$0.kstreamBindingInformationCatalogue.isDlqEnabled(bindingTarget)) {
                        if (o2 instanceof Message) {
                            Message message = (Message)o2;
                            Serde<?> keySerde = this.this$0.kstreamBindingInformationCatalogue.getKeySerde(bindingTarget);
                            Serializer keySerializer = keySerde.serializer();
                            byte[] keyBytes = keySerializer.serialize(null, o);
                            if (this.context.recordMetadata().isPresent()) {
                                RecordMetadata recordMetadata = (RecordMetadata)this.context.recordMetadata().get();
                                ConsumerRecord consumerRecord = new ConsumerRecord(recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), (Object)keyBytes, message.getPayload());
                                this.this$0.sendToDlqAndContinue.sendToDlq(consumerRecord, exception[0]);
                            }
                        } else {
                            RecordMetadata recordMetadata = (RecordMetadata)this.context.recordMetadata().get();
                            ConsumerRecord consumerRecord = new ConsumerRecord(recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), o, o2);
                            this.this$0.sendToDlqAndContinue.sendToDlq(consumerRecord, exception[0]);
                        }
                    } else {
                        if (this.this$0.kstreamBinderConfigurationProperties.getDeserializationExceptionHandler() == DeserializationExceptionHandler.logAndFail) {
                            throw new IllegalStateException("Inbound deserialization failed. Stopping further processing of records.");
                        }
                        if (this.this$0.kstreamBinderConfigurationProperties.getDeserializationExceptionHandler() == DeserializationExceptionHandler.logAndContinue) {
                            LOG.error((Object)"Inbound deserialization failed. Skipping this record and continuing.");
                        }
                    }
                }
            }

            public void close() {
            }
        }, new String[0]);
    }

    private /* synthetic */ boolean lambda$deserializeOnInbound$0(PerRecordContentTypeHolder perRecordContentTypeHolder, Class valueClass, MessageConverter messageConverter, Object o, Object o2) {
        boolean isValidRecord = false;
        try {
            if (o2 != null) {
                if (o2 instanceof Message || o2 instanceof String || o2 instanceof byte[]) {
                    Message m1 = null;
                    if (o2 instanceof Message) {
                        Message message = (Message)o2;
                        m1 = perRecordContentTypeHolder.contentType != null ? MessageBuilder.fromMessage((Message)message).setHeader("contentType", (Object)perRecordContentTypeHolder.contentType).build() : message;
                    } else {
                        m1 = perRecordContentTypeHolder.contentType != null ? MessageBuilder.withPayload((Object)o2).setHeader("contentType", (Object)perRecordContentTypeHolder.contentType).build() : MessageBuilder.withPayload((Object)o2).build();
                    }
                    this.convertAndSetMessage(o, valueClass, messageConverter, m1);
                } else {
                    keyValueThreadLocal.set((KeyValue<Object, Object>)new KeyValue(o, o2));
                }
                isValidRecord = true;
            } else {
                LOG.info((Object)"Received a tombstone record. This will be skipped from further processing.");
            }
        }
        catch (Exception e) {
            LOG.warn((Object)"Deserialization has failed. This will be skipped from further processing.", (Throwable)e);
            this.failedWithDeserException[0] = e;
        }
        return isValidRecord;
    }

    private static /* synthetic */ Object lambda$serializeOnOutbound$1(String contentType, MessageConverter messageConverter, ThreadLocal perRecordContentTypeHolderThreadLocal, Object v) {
        Message m;
        Message message = v instanceof Message ? (m = (Message)v) : MessageBuilder.withPayload((Object)v).build();
        HashMap<String, String> headers = new HashMap<String, String>((Map<String, String>)message.getHeaders());
        if (StringUtils.hasText((String)contentType)) {
            headers.put("contentType", contentType);
        }
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        Message convertedMessage = messageConverter.toMessage(message.getPayload(), messageHeaders);
        ((PerRecordContentTypeHolder)perRecordContentTypeHolderThreadLocal.get()).setContentType((String)messageHeaders.get((Object)"contentType"));
        return Objects.requireNonNull(convertedMessage).getPayload();
    }

    private static class PerRecordContentTypeHolder {
        String contentType;

        private PerRecordContentTypeHolder() {
        }

        void setContentType(String contentType) {
            this.contentType = contentType;
        }

        void unsetContentType() {
            this.contentType = null;
        }
    }
}

