/*
 * Decompiled with CFR 0.152.
 */
package com.github.lhotari.reactive.pulsar.internal.adapter;

import com.github.lhotari.reactive.pulsar.adapter.EndOfStreamAction;
import com.github.lhotari.reactive.pulsar.adapter.InstantStartAtSpec;
import com.github.lhotari.reactive.pulsar.adapter.MessageIdStartAtSpec;
import com.github.lhotari.reactive.pulsar.adapter.ReactiveMessageReader;
import com.github.lhotari.reactive.pulsar.adapter.ReaderConfigurer;
import com.github.lhotari.reactive.pulsar.adapter.StartAtSpec;
import com.github.lhotari.reactive.pulsar.internal.adapter.PulsarFutureAdapter;
import com.github.lhotari.reactive.pulsar.resourceadapter.ReactiveReaderAdapter;
import com.github.lhotari.reactive.pulsar.resourceadapter.ReactiveReaderAdapterFactory;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class DefaultReactiveMessageReader<T>
implements ReactiveMessageReader<T> {
    private final Schema<T> schema;
    private final ReaderConfigurer<T> readerConfigurer;
    private final String topicName;
    private final ReactiveReaderAdapterFactory reactiveReaderAdapterFactory;
    private final StartAtSpec startAtSpec;
    private final EndOfStreamAction endOfStreamAction;

    public DefaultReactiveMessageReader(ReactiveReaderAdapterFactory reactiveReaderAdapterFactory, Schema<T> schema, ReaderConfigurer<T> readerConfigurer, String topicName, StartAtSpec startAtSpec, EndOfStreamAction endOfStreamAction) {
        this.schema = schema;
        this.readerConfigurer = readerConfigurer;
        this.topicName = topicName;
        this.reactiveReaderAdapterFactory = reactiveReaderAdapterFactory;
        this.startAtSpec = startAtSpec;
        this.endOfStreamAction = endOfStreamAction;
    }

    ReactiveReaderAdapter<T> createReactiveReaderAdapter(StartAtSpec startAtSpec) {
        return this.reactiveReaderAdapterFactory.create(this.readerStartingAt(startAtSpec));
    }

    private Function<PulsarClient, ReaderBuilder<T>> readerStartingAt(StartAtSpec startAtSpec) {
        return pulsarClient -> {
            ReaderBuilder readerBuilder = pulsarClient.newReader(this.schema);
            if (this.topicName != null) {
                readerBuilder.topic(this.topicName);
            }
            if (startAtSpec != null) {
                if (startAtSpec instanceof MessageIdStartAtSpec) {
                    MessageIdStartAtSpec messageIdStartAtSpec = (MessageIdStartAtSpec)startAtSpec;
                    readerBuilder.startMessageId(messageIdStartAtSpec.getMessageId());
                    if (messageIdStartAtSpec.isInclusive()) {
                        readerBuilder.startMessageIdInclusive();
                    }
                } else {
                    InstantStartAtSpec instantStartAtSpec = (InstantStartAtSpec)startAtSpec;
                    long rollbackDuration = ChronoUnit.SECONDS.between(instantStartAtSpec.getInstant(), Instant.now()) + 1L;
                    if (rollbackDuration < 0L) {
                        throw new IllegalArgumentException("InstantStartAtSpec must be in the past.");
                    }
                    readerBuilder.startMessageFromRollbackDuration(rollbackDuration, TimeUnit.SECONDS);
                }
            }
            if (this.readerConfigurer != null) {
                this.readerConfigurer.configure(readerBuilder);
            }
            return readerBuilder;
        };
    }

    static <T> Mono<Message<T>> readNextMessage(Reader<T> reader, EndOfStreamAction endOfStreamAction) {
        Mono messageMono = PulsarFutureAdapter.adaptPulsarFuture(() -> reader.readNextAsync());
        if (endOfStreamAction == EndOfStreamAction.COMPLETE) {
            return PulsarFutureAdapter.adaptPulsarFuture(() -> reader.hasMessageAvailableAsync()).filter(Boolean::booleanValue).flatMap(__ -> messageMono);
        }
        return messageMono;
    }

    @Override
    public Mono<Message<T>> readMessage() {
        return this.createReactiveReaderAdapter(this.startAtSpec).usingReader(reader -> DefaultReactiveMessageReader.readNextMessage(reader, this.endOfStreamAction));
    }

    @Override
    public Flux<Message<T>> readMessages() {
        return this.createReactiveReaderAdapter(this.startAtSpec).usingReaderMany(reader -> {
            Mono<Message<T>> messageMono = DefaultReactiveMessageReader.readNextMessage(reader, this.endOfStreamAction);
            if (this.endOfStreamAction == EndOfStreamAction.COMPLETE) {
                return messageMono.repeatWhen(flux -> flux.takeWhile(emitted -> emitted > 0L));
            }
            return messageMono.repeat();
        });
    }
}

