/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.inbound;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

public class KafkaInboundGateway<K, V, R>
extends MessagingGatewaySupport
implements OrderlyShutdownCapable {
    private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal();
    private final IntegrationRecordMessageListener listener = new IntegrationRecordMessageListener();
    private final AbstractMessageListenerContainer<K, V> messageListenerContainer;
    private final KafkaTemplate<K, R> kafkaTemplate;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<? extends Object> recoveryCallback;
    private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;
    private boolean bindSourceRecord;

    public KafkaInboundGateway(AbstractMessageListenerContainer<K, V> messageListenerContainer, KafkaTemplate<K, R> kafkaTemplate) {
        Assert.notNull(messageListenerContainer, (String)"messageListenerContainer is required");
        Assert.notNull(kafkaTemplate, (String)"kafkaTemplate is required");
        Assert.isNull((Object)messageListenerContainer.getContainerProperties().getMessageListener(), (String)"Container must not already have a listener");
        this.messageListenerContainer = messageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
        this.kafkaTemplate = kafkaTemplate;
        this.setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());
    }

    public void setMessageConverter(RecordMessageConverter messageConverter) {
        this.listener.setMessageConverter(messageConverter);
    }

    public void setPayloadType(Class<?> payloadType) {
        this.listener.setFallbackType(payloadType);
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) {
        this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback;
    }

    public void setBindSourceRecord(boolean bindSourceRecord) {
        this.bindSourceRecord = bindSourceRecord;
    }

    protected void onInit() {
        super.onInit();
        IntegrationRecordMessageListener listener = this.listener;
        if (this.retryTemplate != null) {
            listener = new RetryingMessageListenerAdapter((MessageListener)listener, this.retryTemplate, this.recoveryCallback);
            this.retryTemplate.registerListener((RetryListener)this.listener);
        }
        this.messageListenerContainer.getContainerProperties().setMessageListener((Object)listener);
    }

    protected void doStart() {
        this.messageListenerContainer.start();
    }

    protected void doStop() {
        this.messageListenerContainer.stop();
    }

    public String getComponentType() {
        return "kafka:inbound-gateway";
    }

    public int beforeShutdown() {
        this.messageListenerContainer.stop();
        return this.getPhase();
    }

    public int afterShutdown() {
        return this.getPhase();
    }

    private void setAttributesIfNecessary(Object record, Message<?> message) {
        AttributeAccessor attributes;
        boolean needHolder = this.getErrorChannel() != null && this.retryTemplate == null;
        boolean needAttributes = needHolder | this.retryTemplate != null;
        if (needHolder) {
            attributesHolder.set(ErrorMessageUtils.getAttributeAccessor(null, null));
        }
        if (needAttributes && (attributes = attributesHolder.get()) != null) {
            attributes.setAttribute("inputMessage", message);
            attributes.setAttribute("kafka_data", record);
        }
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributes = attributesHolder.get();
        if (attributes == null) {
            return super.getErrorMessageAttributes(message);
        }
        return attributes;
    }

    private class IntegrationRecordMessageListener
    extends RecordMessagingMessageListenerAdapter<K, V>
    implements RetryListener {
        IntegrationRecordMessageListener() {
            super(null, null);
        }

        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
            if (KafkaInboundGateway.this.onPartitionsAssignedSeekCallback != null) {
                KafkaInboundGateway.this.onPartitionsAssignedSeekCallback.accept(assignments, callback);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            Message<?> message;
            block8: {
                message = null;
                try {
                    message = this.enhanceHeaders(this.toMessagingMessage(record, acknowledgment, consumer), record);
                    KafkaInboundGateway.this.setAttributesIfNecessary(record, message);
                }
                catch (RuntimeException e) {
                    if (KafkaInboundGateway.this.getErrorChannel() == null) break block8;
                    KafkaInboundGateway.this.messagingTemplate.send((Object)KafkaInboundGateway.this.getErrorChannel(), (Message)KafkaInboundGateway.this.buildErrorMessage(null, (Throwable)new ConversionException("Failed to convert to message for: " + record, (Throwable)e)));
                }
            }
            if (message != null) {
                try {
                    Message<?> reply = KafkaInboundGateway.this.sendAndReceiveMessage(message);
                    if (reply == null) return;
                    reply = this.enhanceReply(message, reply);
                    KafkaInboundGateway.this.kafkaTemplate.send(reply);
                    return;
                }
                finally {
                    if (KafkaInboundGateway.this.retryTemplate == null) {
                        attributesHolder.remove();
                    }
                }
            } else {
                KafkaInboundGateway.this.logger.debug((Object)("Converter returned a null message for: " + record));
            }
        }

        private Message<?> enhanceHeaders(Message<?> message, ConsumerRecord<K, V> record) {
            Message messageToReturn = message;
            if (message.getHeaders() instanceof KafkaMessageHeaders) {
                Map rawHeaders = ((KafkaMessageHeaders)message.getHeaders()).getRawHeaders();
                if (KafkaInboundGateway.this.retryTemplate != null) {
                    AtomicInteger deliveryAttempt = new AtomicInteger(((RetryContext)attributesHolder.get()).getRetryCount() + 1);
                    rawHeaders.put("deliveryAttempt", deliveryAttempt);
                }
                if (KafkaInboundGateway.this.bindSourceRecord) {
                    rawHeaders.put("sourceData", record);
                }
            } else {
                MessageBuilder builder = MessageBuilder.fromMessage(message);
                if (KafkaInboundGateway.this.retryTemplate != null) {
                    AtomicInteger deliveryAttempt = new AtomicInteger(((RetryContext)attributesHolder.get()).getRetryCount() + 1);
                    builder.setHeader("deliveryAttempt", (Object)deliveryAttempt);
                }
                if (KafkaInboundGateway.this.bindSourceRecord) {
                    builder.setHeader("sourceData", record);
                }
                messageToReturn = builder.build();
            }
            return messageToReturn;
        }

        private Message<?> enhanceReply(Message<?> message, Message<?> reply) {
            AbstractIntegrationMessageBuilder builder = null;
            MessageHeaders replyHeaders = reply.getHeaders();
            MessageHeaders requestHeaders = message.getHeaders();
            if (replyHeaders.get((Object)"kafka_correlationId") == null && requestHeaders.get((Object)"kafka_correlationId") != null) {
                builder = KafkaInboundGateway.this.getMessageBuilderFactory().fromMessage(reply).setHeader("kafka_correlationId", requestHeaders.get((Object)"kafka_correlationId"));
            }
            if (replyHeaders.get((Object)"kafka_topic") == null && requestHeaders.get((Object)"kafka_replyTopic") != null) {
                if (builder == null) {
                    builder = KafkaInboundGateway.this.getMessageBuilderFactory().fromMessage(reply);
                }
                builder.setHeader("kafka_topic", requestHeaders.get((Object)"kafka_replyTopic"));
            }
            if (replyHeaders.get((Object)"kafka_partitionId") == null && requestHeaders.get((Object)"kafka_replyPartition") != null) {
                if (builder == null) {
                    builder = KafkaInboundGateway.this.getMessageBuilderFactory().fromMessage(reply);
                }
                builder.setHeader("kafka_partitionId", requestHeaders.get((Object)"kafka_replyPartition"));
            }
            if (builder != null) {
                return builder.build();
            }
            return reply;
        }

        public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
            if (KafkaInboundGateway.this.retryTemplate != null) {
                attributesHolder.set(context);
            }
            return true;
        }

        public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
            attributesHolder.remove();
        }

        public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        }
    }
}

