/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.pulsar.reactive.listener;

import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder;
import org.apache.pulsar.reactive.client.internal.api.ApiImplementationFactory;
import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties;
import org.springframework.pulsar.reactive.listener.ReactivePulsarMessageHandler;
import org.springframework.pulsar.reactive.listener.ReactivePulsarMessageListenerContainer;
import org.springframework.pulsar.reactive.listener.ReactivePulsarOneByOneMessageHandler;
import org.springframework.pulsar.reactive.listener.ReactivePulsarStreamingHandler;
import org.springframework.util.CollectionUtils;

public class DefaultReactivePulsarMessageListenerContainer<T>
implements ReactivePulsarMessageListenerContainer<T> {
    private final LogAccessor logger = new LogAccessor(this.getClass());
    private final ReactivePulsarConsumerFactory<T> pulsarConsumerFactory;
    private final ReactivePulsarContainerProperties<T> pulsarContainerProperties;
    private boolean autoStartup = true;
    private final ReentrantLock lifecycleLock = new ReentrantLock();
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ReactiveMessageConsumerBuilderCustomizer<T> consumerCustomizer;
    private ReactiveMessagePipeline pipeline;

    public DefaultReactivePulsarMessageListenerContainer(ReactivePulsarConsumerFactory<T> pulsarConsumerFactory, ReactivePulsarContainerProperties<T> pulsarContainerProperties) {
        this.pulsarConsumerFactory = pulsarConsumerFactory;
        this.pulsarContainerProperties = pulsarContainerProperties;
    }

    public ReactivePulsarConsumerFactory<T> getReactivePulsarConsumerFactory() {
        return this.pulsarConsumerFactory;
    }

    @Override
    public ReactivePulsarContainerProperties<T> getContainerProperties() {
        return this.pulsarContainerProperties;
    }

    public boolean isRunning() {
        return this.running.get();
    }

    protected void setRunning(boolean running) {
        this.running.set(running);
    }

    @Override
    public void setupMessageHandler(ReactivePulsarMessageHandler messageHandler) {
        this.pulsarContainerProperties.setMessageHandler(messageHandler);
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    public ReactiveMessageConsumerBuilderCustomizer<T> getConsumerCustomizer() {
        return this.consumerCustomizer;
    }

    @Override
    public void setConsumerCustomizer(ReactiveMessageConsumerBuilderCustomizer<T> consumerCustomizer) {
        this.consumerCustomizer = consumerCustomizer;
    }

    public final void start() {
        this.lifecycleLock.lock();
        try {
            if (!this.isRunning()) {
                Objects.requireNonNull(this.pulsarContainerProperties.getMessageHandler(), "A ReactivePulsarMessageHandler must be provided");
                this.doStart();
            }
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    public void stop() {
        this.lifecycleLock.lock();
        try {
            if (this.isRunning()) {
                this.doStop();
            }
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    private void doStart() {
        this.setRunning(true);
        this.pipeline = this.startPipeline(this.pulsarContainerProperties);
    }

    public void doStop() {
        try {
            this.logger.info((CharSequence)"Closing Pulsar Reactive pipeline.");
            this.pipeline.close();
        }
        catch (Exception e) {
            this.logger.error((Throwable)e, () -> "Error closing Pulsar Reactive pipeline.");
        }
        finally {
            this.setRunning(false);
        }
    }

    private ReactiveMessagePipeline startPipeline(ReactivePulsarContainerProperties<T> containerProperties) {
        ReactiveMessagePipeline pipeline;
        ReactiveMessageConsumerBuilderCustomizer customizer = builder -> {
            if (containerProperties.getSubscriptionType() != null) {
                builder.subscriptionType(containerProperties.getSubscriptionType());
            }
            if (containerProperties.getSubscriptionName() != null) {
                builder.subscriptionName(containerProperties.getSubscriptionName());
            }
            if (!CollectionUtils.isEmpty(containerProperties.getTopics())) {
                builder.topics(new ArrayList<String>(containerProperties.getTopics()));
            }
            if (containerProperties.getTopicsPattern() != null) {
                builder.topicsPattern(containerProperties.getTopicsPattern());
            }
        };
        ArrayList customizers = new ArrayList();
        customizers.add(customizer);
        if (this.consumerCustomizer != null) {
            customizers.add(this.consumerCustomizer);
        }
        ReactiveMessageConsumer<T> consumer = this.getReactivePulsarConsumerFactory().createConsumer(containerProperties.getSchema(), customizers);
        ReactiveMessagePipelineBuilder pipelineBuilder = ApiImplementationFactory.createReactiveMessageHandlerPipelineBuilder(consumer);
        ReactivePulsarMessageHandler messageHandler = containerProperties.getMessageHandler();
        if (messageHandler instanceof ReactivePulsarStreamingHandler) {
            pipeline = pipelineBuilder.streamingMessageHandler(((ReactivePulsarStreamingHandler)messageHandler)::received).build();
        } else {
            ReactiveMessagePipelineBuilder.OneByOneMessagePipelineBuilder messagePipelineBuilder = pipelineBuilder.messageHandler(((ReactivePulsarOneByOneMessageHandler)messageHandler)::received).handlingTimeout(containerProperties.getHandlingTimeout());
            if (containerProperties.getConcurrency() > 0) {
                ReactiveMessagePipelineBuilder.ConcurrentOneByOneMessagePipelineBuilder concurrentPipelineBuilder = messagePipelineBuilder.concurrency(containerProperties.getConcurrency());
                if (containerProperties.isUseKeyOrderedProcessing()) {
                    concurrentPipelineBuilder.useKeyOrderedProcessing();
                }
                pipeline = concurrentPipelineBuilder.build();
            } else {
                pipeline = pipelineBuilder.build();
            }
        }
        pipeline.start();
        return pipeline;
    }
}

