/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.reactorkafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.support.ConsumerConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.support.ProducerConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.utils.BindingUtils;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.context.ApplicationContext;
import org.springframework.context.Lifecycle;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

public class ReactorKafkaBinder
extends AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
    private static final Log logger = LogFactory.getLog(ReactorKafkaBinder.class);
    private final KafkaBinderConfigurationProperties configurationProperties;
    private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
    private ConsumerConfigCustomizer consumerConfigCustomizer;
    private ProducerConfigCustomizer producerConfigCustomizer;

    public ReactorKafkaBinder(KafkaBinderConfigurationProperties configurationProperties, KafkaTopicProvisioner provisioner) {
        super(new String[0], (ProvisioningProvider)provisioner, null, null);
        this.configurationProperties = configurationProperties;
    }

    public void setConsumerConfigCustomizer(ConsumerConfigCustomizer consumerConfigCustomizer) {
        this.consumerConfigCustomizer = consumerConfigCustomizer;
    }

    public void setProducerConfigCustomizer(ProducerConfigCustomizer producerConfigCustomizer) {
        this.producerConfigCustomizer = producerConfigCustomizer;
    }

    protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<KafkaProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {
        Map configs = BindingUtils.createProducerConfigs(producerProperties, (KafkaBinderConfigurationProperties)this.configurationProperties);
        if (this.producerConfigCustomizer != null) {
            this.producerConfigCustomizer.configure(configs, producerProperties.getBindingName(), destination.getName());
        }
        SenderOptions opts = SenderOptions.create((Map)configs);
        MessagingMessageConverter converter = new MessagingMessageConverter();
        return new ReactorMessageHandler((SenderOptions<Object, Object>)opts, (RecordMessageConverter)converter, destination.getName());
    }

    protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, final ExtendedConsumerProperties<KafkaConsumerProperties> properties) throws Exception {
        boolean anonymous = !StringUtils.hasText((String)group);
        String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
        Map configs = BindingUtils.createConsumerConfigs((boolean)anonymous, (String)consumerGroup, properties, (KafkaBinderConfigurationProperties)this.configurationProperties);
        if (this.consumerConfigCustomizer != null) {
            this.consumerConfigCustomizer.configure(configs, properties.getBindingName(), destination.getName());
        }
        final MessageConverter converter = BindingUtils.getConsumerMessageConverter((ApplicationContext)this.getApplicationContext(), properties, (KafkaBinderConfigurationProperties)this.configurationProperties);
        Assert.isInstanceOf(RecordMessageConverter.class, (Object)converter);
        final ReceiverOptions opts = ReceiverOptions.create((Map)configs).addAssignListener(parts -> logger.info((Object)("Assigned: " + parts))).subscription(Collections.singletonList(destination.getName()));
        class ReactorMessageProducer
        extends MessageProducerSupport {
            private final List<KafkaReceiver<Object, Object>> receivers = new ArrayList<KafkaReceiver<Object, Object>>();

            ReactorMessageProducer() {
                for (int i = 0; i < properties.getConcurrency(); ++i) {
                    this.receivers.add((KafkaReceiver<Object, Object>)KafkaReceiver.create((ReceiverOptions)opts));
                }
            }

            protected void doStart() {
                ArrayList<Flux> fluxes = new ArrayList<Flux>();
                int concurrency = properties.getConcurrency();
                for (int i = 0; i < concurrency; ++i) {
                    fluxes.add(this.receivers.get(i).receive().map(record -> ((RecordMessageConverter)converter).toMessage((ConsumerRecord)record, null, null, null)));
                }
                if (concurrency == 1) {
                    this.subscribeToPublisher((Publisher)fluxes.get(0));
                } else {
                    this.subscribeToPublisher((Publisher)Flux.merge(fluxes));
                }
            }
        }
        return new ReactorMessageProducer();
    }

    public KafkaConsumerProperties getExtendedConsumerProperties(String channelName) {
        return (KafkaConsumerProperties)this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
    }

    public KafkaProducerProperties getExtendedProducerProperties(String channelName) {
        return (KafkaProducerProperties)this.extendedBindingProperties.getExtendedProducerProperties(channelName);
    }

    public String getDefaultsPrefix() {
        return this.extendedBindingProperties.getDefaultsPrefix();
    }

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
    }

    public void setExtendedBindingProperties(KafkaExtendedBindingProperties extendedBindingProperties) {
        this.extendedBindingProperties = extendedBindingProperties;
    }

    private static class ReactorMessageHandler
    extends AbstractMessageHandler
    implements Lifecycle {
        private final RecordMessageConverter converter;
        private final String topic;
        private final SenderOptions<Object, Object> senderOptions;
        private volatile KafkaSender<Object, Object> sender;
        private volatile boolean running;

        ReactorMessageHandler(SenderOptions<Object, Object> opts, RecordMessageConverter converter, String topic) {
            this.senderOptions = opts;
            this.converter = converter;
            this.topic = topic;
        }

        protected void handleMessageInternal(Message<?> message) {
            Object sendResultHeader = message.getHeaders().get((Object)"sendResult");
            Sinks.One sink = Sinks.one();
            if (sendResultHeader instanceof AtomicReference) {
                AtomicReference result = (AtomicReference)sendResultHeader;
                result.set(sink.asMono());
            }
            if (this.sender != null) {
                UUID uuid = UUID.randomUUID();
                SenderRecord sr = SenderRecord.create((ProducerRecord)this.converter.fromMessage(message, this.topic), (Object)uuid);
                Flux result = this.sender.send((Publisher)Flux.just((Object)sr));
                result.subscribe(res -> sink.emitValue((Object)res.recordMetadata(), null));
            } else {
                sink.emitError((Throwable)new IllegalStateException("Handler is not running"), null);
            }
        }

        public synchronized void start() {
            if (!this.running) {
                this.sender = KafkaSender.create(this.senderOptions);
                this.running = true;
            }
        }

        public synchronized void stop() {
            if (this.running) {
                KafkaSender<Object, Object> theSender = this.sender;
                this.sender = null;
                theSender.close();
                this.running = false;
            }
        }

        public boolean isRunning() {
            return this.running;
        }
    }
}

