/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.reactorkafka;

import io.micrometer.observation.ObservationRegistry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.common.BinderHeaderMapper;
import org.springframework.cloud.stream.binder.kafka.common.TopicInformation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.support.ConsumerConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.support.ProducerConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.utils.BindingUtils;
import org.springframework.cloud.stream.binder.reactorkafka.ReceiverOptionsCustomizer;
import org.springframework.cloud.stream.binder.reactorkafka.SenderOptionsCustomizer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.context.ApplicationContext;
import org.springframework.context.Lifecycle;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

@Deprecated(since="4.3", forRemoval=true)
public class ReactorKafkaBinder
extends AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
    private static final Log logger = LogFactory.getLog(ReactorKafkaBinder.class);
    private final KafkaBinderConfigurationProperties configurationProperties;
    private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
    private ConsumerConfigCustomizer consumerConfigCustomizer;
    private ProducerConfigCustomizer producerConfigCustomizer;
    private ReceiverOptionsCustomizer<Object, Object> receiverOptionsCustomizer = (name, opts) -> opts;
    private SenderOptionsCustomizer<Object, Object> senderOptionsCustomizer = (name, opts) -> opts;
    private final Map<String, TopicInformation> topicsInUse = new ConcurrentHashMap<String, TopicInformation>();
    private final Map<String, MessageProducerSupport> messageProducers = new ConcurrentHashMap<String, MessageProducerSupport>();
    private final ObservationRegistry observationRegistry;

    public ReactorKafkaBinder(KafkaBinderConfigurationProperties configurationProperties, KafkaTopicProvisioner provisioner, @Nullable ObservationRegistry observationRegistry) {
        super(new String[0], (ProvisioningProvider)provisioner, null, null);
        this.configurationProperties = configurationProperties;
        this.observationRegistry = observationRegistry;
    }

    public void setConsumerConfigCustomizer(ConsumerConfigCustomizer consumerConfigCustomizer) {
        this.consumerConfigCustomizer = consumerConfigCustomizer;
    }

    public void setProducerConfigCustomizer(ProducerConfigCustomizer producerConfigCustomizer) {
        this.producerConfigCustomizer = producerConfigCustomizer;
    }

    public void receiverOptionsCustomizers(ObjectProvider<ReceiverOptionsCustomizer> customizers) {
        if (customizers.getIfUnique() != null) {
            this.receiverOptionsCustomizer = (ReceiverOptionsCustomizer)customizers.getIfUnique();
        } else {
            List list = customizers.orderedStream().toList();
            ReceiverOptionsCustomizer customizer = (name, opts) -> {
                ReceiverOptions last = null;
                for (ReceiverOptionsCustomizer cust : list) {
                    last = (ReceiverOptions)cust.apply(name, opts);
                }
                return last;
            };
            if (!list.isEmpty()) {
                this.receiverOptionsCustomizer = customizer;
            }
        }
    }

    public void senderOptionsCustomizers(ObjectProvider<SenderOptionsCustomizer> customizers) {
        if (customizers.getIfUnique() != null) {
            this.senderOptionsCustomizer = (SenderOptionsCustomizer)customizers.getIfUnique();
        } else {
            List list = customizers.orderedStream().toList();
            SenderOptionsCustomizer customizer = (name, opts) -> {
                SenderOptions last = null;
                for (SenderOptionsCustomizer cust : list) {
                    last = (SenderOptions)cust.apply(name, opts);
                }
                return last;
            };
            if (!list.isEmpty()) {
                this.senderOptionsCustomizer = customizer;
            }
        }
    }

    protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<KafkaProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {
        Map configs = BindingUtils.createProducerConfigs(producerProperties, (KafkaBinderConfigurationProperties)this.configurationProperties);
        if (this.producerConfigCustomizer != null) {
            this.producerConfigCustomizer.configure(configs, producerProperties.getBindingName(), destination.getName());
        }
        Map props = BindingUtils.createProducerConfigs(producerProperties, (KafkaBinderConfigurationProperties)this.configurationProperties);
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(props);
        Collection partitions = ((KafkaTopicProvisioner)this.provisioningProvider).getPartitionsForTopic(producerProperties.getPartitionCount(), false, () -> {
            Producer producer = producerFactory.createProducer();
            List partitionsFor = producer.partitionsFor(destination.getName());
            producer.close();
            return partitionsFor;
        }, destination.getName());
        this.topicsInUse.put(destination.getName(), new TopicInformation(null, partitions, false));
        SenderOptions opts = (SenderOptions)this.senderOptionsCustomizer.apply(producerProperties.getBindingName(), SenderOptions.create((Map)configs));
        if (this.configurationProperties.isEnableObservation() && this.observationRegistry != null) {
            opts = opts.withObservation(this.observationRegistry);
        }
        MessagingMessageConverter converter = new MessagingMessageConverter();
        AbstractApplicationContext applicationContext = this.getApplicationContext();
        FluxMessageChannel resultChannel = null;
        String channelName = ((KafkaProducerProperties)producerProperties.getExtension()).getRecordMetadataChannel();
        if (channelName != null && applicationContext.containsBean(channelName)) {
            resultChannel = (FluxMessageChannel)applicationContext.getBean(channelName, FluxMessageChannel.class);
        }
        return new ReactorMessageHandler((SenderOptions<Object, Object>)opts, (RecordMessageConverter)converter, destination.getName(), resultChannel);
    }

    Map<String, TopicInformation> getTopicsInUse() {
        return this.topicsInUse;
    }

    protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, final ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
        boolean anonymous = !StringUtils.hasText((String)group);
        Object consumerGroup = anonymous ? "anonymous." + UUID.randomUUID() : group;
        Map configs = BindingUtils.createConsumerConfigs((boolean)anonymous, (String)consumerGroup, properties, (KafkaBinderConfigurationProperties)this.configurationProperties);
        String destinations = destination.getName();
        if (this.consumerConfigCustomizer != null) {
            this.consumerConfigCustomizer.configure(configs, properties.getBindingName(), destinations);
        }
        final MessageConverter converter = BindingUtils.getConsumerMessageConverter((ApplicationContext)this.getApplicationContext(), properties, (KafkaBinderConfigurationProperties)this.configurationProperties);
        Assert.isInstanceOf(RecordMessageConverter.class, (Object)converter);
        List<String> destList = Arrays.stream(StringUtils.commaDelimitedListToStringArray((String)destinations)).map(String::trim).toList();
        ReceiverOptions opts = ReceiverOptions.create((Map)configs).addAssignListener(parts -> logger.info((Object)("Assigned: " + parts)));
        opts = ((KafkaConsumerProperties)properties.getExtension()).isDestinationIsPattern() ? opts.subscription(Pattern.compile(destinations)) : opts.subscription(destList);
        final ReceiverOptions finalOpts = opts = (ReceiverOptions)this.receiverOptionsCustomizer.apply(properties.getBindingName(), opts);
        Map props = BindingUtils.createConsumerConfigs((boolean)anonymous, (String)consumerGroup, properties, (KafkaBinderConfigurationProperties)this.configurationProperties);
        DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory(props);
        int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
        boolean groupManagement = ((KafkaConsumerProperties)properties.getExtension()).isAutoRebalanceEnabled();
        if (!properties.isMultiplex()) {
            ((KafkaTopicProvisioner)this.provisioningProvider).getListenedPartitions((String)consumerGroup, properties, (ConsumerFactory)factory, partitionCount, ((KafkaConsumerProperties)properties.getExtension()).isDestinationIsPattern(), groupManagement, destination.getName(), this.topicsInUse);
        } else {
            for (String name : StringUtils.commaDelimitedListToStringArray((String)destination.getName())) {
                ((KafkaTopicProvisioner)this.provisioningProvider).getListenedPartitions((String)consumerGroup, properties, (ConsumerFactory)factory, partitionCount, ((KafkaConsumerProperties)properties.getExtension()).isDestinationIsPattern(), groupManagement, name.trim(), this.topicsInUse);
            }
        }
        class ReactorMessageProducer
        extends MessageProducerSupport {
            private final List<KafkaReceiver<Object, Object>> receivers = new ArrayList<KafkaReceiver<Object, Object>>();

            ReactorMessageProducer() {
                for (int i = 0; i < properties.getConcurrency(); ++i) {
                    this.receivers.add((KafkaReceiver<Object, Object>)KafkaReceiver.create((ReceiverOptions)finalOpts));
                }
            }

            protected void doStart() {
                ArrayList<Flux> fluxes = new ArrayList<Flux>();
                int concurrency = properties.getConcurrency();
                boolean autoCommit = ((KafkaConsumerProperties)properties.getExtension()).isReactiveAutoCommit();
                boolean atMostOnce = ((KafkaConsumerProperties)properties.getExtension()).isReactiveAtMostOnce();
                Assert.state((!autoCommit || !atMostOnce ? 1 : 0) != 0, (String)"Cannot set both reactiveAutoCommit and reactiveAtMostOnce");
                for (int i = 0; i < concurrency; ++i) {
                    Flux receive = null;
                    KafkaReceiver<Object, Object> kafkaReceiver = this.receivers.get(i);
                    if (atMostOnce) {
                        receive = kafkaReceiver.receiveAtmostOnce();
                    } else if (!autoCommit) {
                        receive = kafkaReceiver.receive();
                    }
                    if (autoCommit) {
                        fluxes.add(kafkaReceiver.receiveAutoAck().map(inner -> new GenericMessage(inner)));
                        continue;
                    }
                    fluxes.add(receive.map(record -> {
                        Message message = ((RecordMessageConverter)converter).toMessage(record, null, null, null);
                        return this.addAckHeaderIfNeeded(atMostOnce, (ConsumerRecord<Object, Object>)record, (Message<Object>)message);
                    }));
                }
                if (concurrency == 1) {
                    this.subscribeToPublisher((Publisher)fluxes.get(0));
                } else {
                    this.subscribeToPublisher((Publisher)Flux.merge(fluxes));
                }
            }

            private Message<Object> addAckHeaderIfNeeded(boolean autoCommit, ConsumerRecord<Object, Object> record, Message<Object> message) {
                if (!autoCommit) {
                    MessageHeaders messageHeaders = message.getHeaders();
                    if (messageHeaders instanceof KafkaMessageHeaders) {
                        KafkaMessageHeaders headers = (KafkaMessageHeaders)messageHeaders;
                        headers.getRawHeaders().put("kafka_acknowledgment", ((ReceiverRecord)record).receiverOffset());
                    } else {
                        message = ((MessageBuilder)MessageBuilder.fromMessage(message).setHeader("kafka_acknowledgment", (Object)((ReceiverRecord)record).receiverOffset())).build();
                    }
                }
                return message;
            }
        }
        ReactorMessageProducer reactorMessageProducer = new ReactorMessageProducer();
        this.messageProducers.put((String)consumerGroup, reactorMessageProducer);
        return reactorMessageProducer;
    }

    public Map<String, MessageProducerSupport> getMessageProducers() {
        return this.messageProducers;
    }

    public KafkaConsumerProperties getExtendedConsumerProperties(String channelName) {
        return (KafkaConsumerProperties)this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
    }

    public KafkaProducerProperties getExtendedProducerProperties(String channelName) {
        return (KafkaProducerProperties)this.extendedBindingProperties.getExtendedProducerProperties(channelName);
    }

    public String getDefaultsPrefix() {
        return this.extendedBindingProperties.getDefaultsPrefix();
    }

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
    }

    public void setExtendedBindingProperties(KafkaExtendedBindingProperties extendedBindingProperties) {
        this.extendedBindingProperties = extendedBindingProperties;
    }

    private static class ReactorMessageHandler
    extends AbstractMessageHandler
    implements Lifecycle {
        private final RecordMessageConverter converter;
        private final String topic;
        private final SenderOptions<Object, Object> senderOptions;
        @Nullable
        private final FluxMessageChannel results;
        private volatile KafkaSender<Object, Object> sender;
        private volatile boolean running;

        ReactorMessageHandler(SenderOptions<Object, Object> opts, RecordMessageConverter converter, String topic, @Nullable FluxMessageChannel results) {
            this.senderOptions = opts;
            this.converter = converter;
            ((MessagingMessageConverter)converter).setHeaderMapper((KafkaHeaderMapper)new BinderHeaderMapper());
            this.topic = topic;
            this.results = results;
        }

        protected void handleMessageInternal(Message<?> message) {
            if (this.sender != null) {
                Object correlation = message.getHeaders().get((Object)"correlationId");
                if (correlation == null) {
                    correlation = UUID.randomUUID();
                }
                SenderRecord sr = SenderRecord.create((ProducerRecord)this.converter.fromMessage(message, this.topic), (Object)correlation);
                Flux result = this.sender.send((Publisher)Flux.just((Object)sr)).contextCapture();
                result.subscribe(res -> {
                    if (this.results != null) {
                        this.results.send(((MessageBuilder)MessageBuilder.withPayload((Object)res).copyHeaders((Map)message.getHeaders())).build());
                    }
                });
            }
        }

        public synchronized void start() {
            if (!this.running) {
                this.sender = KafkaSender.create(this.senderOptions);
                this.running = true;
            }
        }

        public synchronized void stop() {
            if (this.running) {
                KafkaSender<Object, Object> theSender = this.sender;
                this.sender = null;
                theSender.close();
                this.running = false;
            }
        }

        public boolean isRunning() {
            return this.running;
        }
    }
}

