/*
 * Decompiled with CFR 0.152.
 */
package org.citrusframework.kafka.endpoint;

import jakarta.annotation.Nullable;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.commons.lang3.RandomStringUtils;
import org.citrusframework.DefaultTestActions;
import org.citrusframework.actions.ReceiveMessageBuilderFactory;
import org.citrusframework.common.ShutdownPhase;
import org.citrusframework.endpoint.AbstractEndpoint;
import org.citrusframework.endpoint.Endpoint;
import org.citrusframework.endpoint.EndpointConfiguration;
import org.citrusframework.kafka.endpoint.KafkaConsumer;
import org.citrusframework.kafka.endpoint.KafkaEndpointConfiguration;
import org.citrusframework.kafka.endpoint.KafkaMessageFilter;
import org.citrusframework.kafka.endpoint.KafkaProducer;
import org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector;
import org.citrusframework.kafka.endpoint.selector.KafkaMessageSelector;
import org.citrusframework.kafka.endpoint.selector.KafkaMessageSelectorFactory;
import org.citrusframework.util.StringUtils;
import org.springframework.util.CollectionUtils;

public class KafkaEndpoint
extends AbstractEndpoint
implements ShutdownPhase {
    @Nullable
    private KafkaProducer kafkaProducer;
    @Nullable
    private KafkaConsumer kafkaConsumer;
    private final ThreadLocal<KafkaConsumer> threadLocalKafkaConsumer = ThreadLocal.withInitial(() -> new KafkaConsumer(this.getConsumerName(), this.getEndpointConfiguration()));

    public static SimpleKafkaEndpointBuilder builder() {
        return new SimpleKafkaEndpointBuilder();
    }

    public KafkaEndpoint() {
        this(new KafkaEndpointConfiguration());
    }

    public KafkaEndpoint(KafkaEndpointConfiguration endpointConfiguration) {
        super((EndpointConfiguration)endpointConfiguration);
    }

    static KafkaEndpoint newKafkaEndpoint(@Nullable Boolean randomConsumerGroup, @Nullable String server, @Nullable Long timeout, @Nullable String topic, KafkaMessageSelectorFactory.KafkaMessageSelectorFactories customStrategies, boolean useThreadSafeConsumer) {
        KafkaEndpoint kafkaEndpoint = new KafkaEndpoint();
        if (Boolean.TRUE.equals(randomConsumerGroup)) {
            kafkaEndpoint.getEndpointConfiguration().setConsumerGroup("citrus_kafka_" + RandomStringUtils.insecure().nextAlphabetic(10).toLowerCase());
        }
        if (StringUtils.hasText((String)server)) {
            kafkaEndpoint.getEndpointConfiguration().setServer(server);
        }
        if (Objects.nonNull(timeout)) {
            kafkaEndpoint.getEndpointConfiguration().setTimeout(timeout);
        }
        if (StringUtils.hasText((String)topic)) {
            kafkaEndpoint.getEndpointConfiguration().setTopic(topic);
        }
        if (!CollectionUtils.isEmpty((Map)customStrategies)) {
            kafkaEndpoint.getEndpointConfiguration().getKafkaMessageSelectorFactory().setCustomStrategies(customStrategies);
        }
        kafkaEndpoint.getEndpointConfiguration().setUseThreadSafeConsumer(useThreadSafeConsumer);
        return kafkaEndpoint;
    }

    @Deprecated(forRemoval=true)
    static KafkaEndpoint newKafkaEndpoint(@Nullable org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> kafkaConsumer, @Nullable org.apache.kafka.clients.producer.KafkaProducer<Object, Object> kafkaProducer, @Nullable Boolean randomConsumerGroup, @Nullable String server, @Nullable Long timeout, @Nullable String topic, KafkaMessageSelectorFactory.KafkaMessageSelectorFactories customStrategies, boolean useThreadSafeConsumer) {
        KafkaEndpoint kafkaEndpoint = KafkaEndpoint.newKafkaEndpoint(randomConsumerGroup, server, timeout, topic, customStrategies, useThreadSafeConsumer);
        if (Objects.nonNull(kafkaConsumer)) {
            kafkaEndpoint.createConsumer().setConsumer(kafkaConsumer);
        }
        if (Objects.nonNull(kafkaProducer)) {
            kafkaEndpoint.createProducer().setProducer(kafkaProducer);
        }
        return kafkaEndpoint;
    }

    public KafkaConsumer createConsumer() {
        if (this.getEndpointConfiguration().useThreadSafeConsumer()) {
            return this.threadLocalKafkaConsumer.get();
        }
        if (Objects.isNull((Object)this.kafkaConsumer)) {
            this.kafkaConsumer = new KafkaConsumer(this.getConsumerName(), this.getEndpointConfiguration());
        }
        return this.kafkaConsumer;
    }

    public KafkaProducer createProducer() {
        if (this.kafkaProducer == null) {
            this.kafkaProducer = new KafkaProducer(this.getProducerName(), this.getEndpointConfiguration());
        }
        return this.kafkaProducer;
    }

    public KafkaEndpointConfiguration getEndpointConfiguration() {
        return (KafkaEndpointConfiguration)super.getEndpointConfiguration();
    }

    public void destroy() {
        if (this.getEndpointConfiguration().useThreadSafeConsumer()) {
            this.threadLocalKafkaConsumer.get().stop();
            this.threadLocalKafkaConsumer.remove();
        } else if (Objects.nonNull((Object)this.kafkaConsumer)) {
            this.kafkaConsumer.stop();
        }
        if (Objects.nonNull(this.kafkaProducer)) {
            this.kafkaProducer.stop();
        }
    }

    public ReceiveMessageBuilderFactory<?, ?> findKafkaEventHeaderEquals(Duration lookbackWindow, String key, String value) {
        return new DefaultTestActions().receive((Endpoint)this).selector(KafkaMessageFilter.kafkaMessageFilter().eventLookbackWindow(lookbackWindow).kafkaMessageSelector(KafkaMessageByHeaderSelector.kafkaHeaderEquals(key, value)).build()).message();
    }

    public static class SimpleKafkaEndpointBuilder {
        private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> kafkaConsumer;
        private org.apache.kafka.clients.producer.KafkaProducer<Object, Object> kafkaProducer;
        private Boolean randomConsumerGroup;
        private String server;
        private Long timeout;
        private String topic;
        private boolean useThreadSafeConsumer = false;
        private KafkaMessageSelectorFactory.KafkaMessageSelectorFactories customStrategies;

        @Deprecated(forRemoval=true)
        public SimpleKafkaEndpointBuilder kafkaConsumer(org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> kafkaConsumer) {
            this.kafkaConsumer = kafkaConsumer;
            return this;
        }

        @Deprecated(forRemoval=true)
        public SimpleKafkaEndpointBuilder kafkaProducer(org.apache.kafka.clients.producer.KafkaProducer<Object, Object> kafkaProducer) {
            this.kafkaProducer = kafkaProducer;
            return this;
        }

        public SimpleKafkaEndpointBuilder randomConsumerGroup(Boolean randomConsumerGroup) {
            this.randomConsumerGroup = randomConsumerGroup;
            return this;
        }

        public SimpleKafkaEndpointBuilder server(String server) {
            this.server = server;
            return this;
        }

        public SimpleKafkaEndpointBuilder timeout(Long timeout) {
            this.timeout = timeout;
            return this;
        }

        public SimpleKafkaEndpointBuilder topic(String topic) {
            this.topic = topic;
            return this;
        }

        public SimpleKafkaEndpointBuilder useThreadSafeConsumer() {
            this.useThreadSafeConsumer = true;
            return this;
        }

        public SimpleKafkaEndpointBuilder withCustomStrategy(Predicate<Map<String, Object>> selector, Function<Map<String, Object>, KafkaMessageSelector> initializer) {
            this.customStrategies.put(selector, initializer);
            return this;
        }

        public KafkaEndpoint build() {
            return KafkaEndpoint.newKafkaEndpoint(this.kafkaConsumer, this.kafkaProducer, this.randomConsumerGroup, this.server, this.timeout, this.topic, this.customStrategies, this.useThreadSafeConsumer);
        }
    }
}

