/*
 * Decompiled with CFR 0.152.
 */
package com.github.lhotari.reactive.pulsar.internal.adapter;

import com.github.lhotari.reactive.pulsar.adapter.ConsumerConfigurer;
import com.github.lhotari.reactive.pulsar.adapter.MessageResult;
import com.github.lhotari.reactive.pulsar.adapter.ReactiveMessageConsumer;
import com.github.lhotari.reactive.pulsar.internal.adapter.PulsarFutureAdapter;
import com.github.lhotari.reactive.pulsar.resourceadapter.ReactiveConsumerAdapter;
import com.github.lhotari.reactive.pulsar.resourceadapter.ReactiveConsumerAdapterFactory;
import java.util.function.Function;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

class DefaultReactiveMessageConsumer<T>
implements ReactiveMessageConsumer<T> {
    private final ReactiveConsumerAdapterFactory reactiveConsumerAdapterFactory;
    private final Schema<T> schema;
    private final ConsumerConfigurer<T> consumerConfigurer;
    private final String topicName;
    private final boolean acknowledgeAsynchronously;
    private final Scheduler acknowledgeScheduler;

    public DefaultReactiveMessageConsumer(ReactiveConsumerAdapterFactory reactiveConsumerAdapterFactory, Schema<T> schema, ConsumerConfigurer<T> consumerConfigurer, String topicName, boolean acknowledgeAsynchronously, Scheduler acknowledgeScheduler) {
        this.reactiveConsumerAdapterFactory = reactiveConsumerAdapterFactory;
        this.schema = schema;
        this.consumerConfigurer = consumerConfigurer;
        this.topicName = topicName;
        this.acknowledgeAsynchronously = acknowledgeAsynchronously;
        this.acknowledgeScheduler = acknowledgeScheduler;
    }

    @Override
    public <R> Mono<R> consumeMessage(Function<Mono<Message<T>>, Mono<MessageResult<R>>> messageHandler) {
        return this.createReactiveConsumerAdapter().usingConsumer(consumer -> Mono.using(() -> Schedulers.single((Scheduler)this.acknowledgeScheduler), pinnedAcknowledgeScheduler -> ((Mono)messageHandler.apply(DefaultReactiveMessageConsumer.readNextMessage(consumer))).delayUntil(messageResult -> this.handleAcknowledgement((Consumer<T>)consumer, (MessageResult)messageResult, (Scheduler)pinnedAcknowledgeScheduler)).handle(this::handleMessageResult), Scheduler::dispose));
    }

    private <R> Mono<?> handleAcknowledgement(Consumer<T> consumer, MessageResult<R> messageResult, Scheduler pinnedAcknowledgeScheduler) {
        if (messageResult.getMessageId() != null) {
            Mono acknowledgementMono = messageResult.isAcknowledgeMessage() ? Mono.fromFuture(() -> consumer.acknowledgeAsync(messageResult.getMessageId())) : Mono.fromRunnable(() -> consumer.negativeAcknowledge(messageResult.getMessageId()));
            acknowledgementMono = acknowledgementMono.subscribeOn(pinnedAcknowledgeScheduler);
            if (this.acknowledgeAsynchronously) {
                return Mono.fromRunnable(() -> ((Mono)acknowledgementMono).subscribe());
            }
            return acknowledgementMono;
        }
        return Mono.empty();
    }

    static <T> Mono<Message<T>> readNextMessage(Consumer<T> consumer) {
        return PulsarFutureAdapter.adaptPulsarFuture(() -> consumer.receiveAsync());
    }

    private ReactiveConsumerAdapter<T> createReactiveConsumerAdapter() {
        return this.reactiveConsumerAdapterFactory.create(pulsarClient -> {
            ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(this.schema);
            if (this.topicName != null) {
                consumerBuilder.topic(new String[]{this.topicName});
            }
            if (this.consumerConfigurer != null) {
                this.consumerConfigurer.configure(consumerBuilder);
            }
            return consumerBuilder;
        });
    }

    @Override
    public <R> Flux<R> consumeMessages(Function<Flux<Message<T>>, Flux<MessageResult<R>>> messageHandler) {
        return this.createReactiveConsumerAdapter().usingConsumerMany(consumer -> Flux.using(() -> Schedulers.single((Scheduler)this.acknowledgeScheduler), pinnedAcknowledgeScheduler -> ((Flux)messageHandler.apply(DefaultReactiveMessageConsumer.readNextMessage(consumer).repeat())).delayUntil(messageResult -> this.handleAcknowledgement((Consumer<T>)consumer, (MessageResult)messageResult, (Scheduler)pinnedAcknowledgeScheduler)).handle(this::handleMessageResult), Scheduler::dispose));
    }

    @Override
    public Mono<Void> consumeNothing() {
        return this.createReactiveConsumerAdapter().usingConsumer(consumer -> Mono.empty());
    }

    private <R> void handleMessageResult(MessageResult<R> messageResult, SynchronousSink<R> sink) {
        R value = messageResult.getValue();
        if (value != null) {
            sink.next(value);
        }
    }
}

