/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.vertx.kafka.operations;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration;
import org.apache.camel.component.vertx.kafka.operations.TopicSubscription;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.function.TriConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public class VertxKafkaConsumerOperations {
    private static final Logger LOG = LoggerFactory.getLogger(VertxKafkaConsumerOperations.class);
    private final KafkaConsumer<Object, Object> kafkaConsumer;
    private final VertxKafkaConfiguration configuration;

    public VertxKafkaConsumerOperations(KafkaConsumer<Object, Object> kafkaConsumer, VertxKafkaConfiguration configuration) {
        this.kafkaConsumer = kafkaConsumer;
        this.configuration = configuration;
    }

    public void receiveEvents(Consumer<KafkaConsumerRecord<Object, Object>> recordHandler, Consumer<Throwable> errorHandler) {
        if (ObjectHelper.isEmpty((Object)this.configuration.getTopic())) {
            throw new IllegalArgumentException("Topic or list of topics need to be set in the topic config.");
        }
        TopicSubscription topicSubscription = new TopicSubscription(this.configuration.getTopic(), this.configuration.getPartitionId(), this.configuration.getSeekToOffset(), this.configuration.getSeekToPosition());
        this.kafkaConsumer.handler(recordHandler::accept);
        this.kafkaConsumer.exceptionHandler(errorHandler::accept);
        if (ObjectHelper.isEmpty((Object)topicSubscription.getPartitionId())) {
            this.subscribe(topicSubscription, errorHandler);
        } else {
            this.assign(topicSubscription, errorHandler);
        }
    }

    private void subscribe(TopicSubscription topicSubscription, Consumer<Throwable> errorHandler) {
        LOG.info("Subscribing to {} topics", (Object)topicSubscription.getConfiguredTopicName());
        this.seekOnPartitionAssignment(topicSubscription, errorHandler);
        this.subscribeToTopics(topicSubscription.getTopics()).subscribe(unused -> {}, errorHandler, () -> {});
    }

    private void seekOnPartitionAssignment(TopicSubscription topicSubscription, Consumer<Throwable> errorHandler) {
        if (this.isSeekToSet(topicSubscription)) {
            this.getTopicPartitionsOnPartitionAssignment().flatMap(topicPartition -> this.seekToOffsetOrPositionInPartition((TopicPartition)topicPartition, topicSubscription)).subscribe(result -> {}, errorHandler, () -> LOG.info("Seeking partitions is done."));
        }
    }

    private boolean isSeekToSet(TopicSubscription topicSubscription) {
        return ObjectHelper.isNotEmpty((Object)topicSubscription.getSeekToOffset()) || ObjectHelper.isNotEmpty((Object)((Object)topicSubscription.getSeekToPosition()));
    }

    private Flux<TopicPartition> getTopicPartitionsOnPartitionAssignment() {
        return Flux.create(sink -> this.kafkaConsumer.partitionsAssignedHandler(partitions -> {
            LOG.info("Partition {} is assigned to consumer", partitions);
            partitions.forEach(topicPartition -> {
                LOG.info("Partition {} is assigned to consumer for topic {}", (Object)topicPartition.getPartition(), (Object)topicPartition.getTopic());
                sink.next(topicPartition);
            });
            sink.complete();
        }));
    }

    private Mono<Void> seekToOffsetOrPositionInPartition(TopicPartition topicPartition, TopicSubscription topicSubscription) {
        if (ObjectHelper.isNotEmpty((Object)topicSubscription.getSeekToOffset())) {
            LOG.info("Seeking topic {} with partition {} to offset {}.", new Object[]{topicPartition.getTopic(), topicPartition.getPartition(), topicSubscription.getSeekToOffset()});
            return this.wrapToMono((arg_0, arg_1, arg_2) -> this.kafkaConsumer.seek(arg_0, arg_1, arg_2), topicPartition, topicSubscription.getSeekToOffset());
        }
        return this.seekToPosition(topicPartition, topicSubscription.getSeekToPosition());
    }

    private Mono<Void> seekToPosition(TopicPartition topicPartition, TopicSubscription.OffsetPosition position) {
        switch (position) {
            case BEGINNING: {
                LOG.info("Seeking topic {} with partition {} to the beginning.", (Object)topicPartition.getTopic(), (Object)topicPartition.getPartition());
                return this.wrapToMono((arg_0, arg_1) -> this.kafkaConsumer.seekToBeginning(arg_0, arg_1), topicPartition);
            }
            case END: {
                LOG.info("Seeking topic {} with partition {} to the end.", (Object)topicPartition.getTopic(), (Object)topicPartition.getPartition());
                return this.wrapToMono((arg_0, arg_1) -> this.kafkaConsumer.seekToEnd(arg_0, arg_1), topicPartition);
            }
        }
        LOG.warn("No valid positions being set, hence the seeking operation will be ignored.");
        return Mono.empty();
    }

    private Mono<Void> subscribeToTopics(Set<String> topics) {
        return this.wrapToMono((arg_0, arg_1) -> this.kafkaConsumer.subscribe(arg_0, arg_1), topics);
    }

    private void assign(TopicSubscription topicSubscription, Consumer<Throwable> errorHandler) {
        LOG.info("Assigning topics {} to partition {}", (Object)topicSubscription.getConfiguredTopicName(), (Object)topicSubscription.getPartitionId());
        this.assignToPartitions(topicSubscription.getTopicPartitions()).then(this.seekPartitionsManually(topicSubscription)).subscribe(unused -> {}, errorHandler, () -> {});
    }

    private Mono<Void> seekPartitionsManually(TopicSubscription topicSubscription) {
        if (this.isSeekToSet(topicSubscription)) {
            return Flux.fromIterable(topicSubscription.getTopicPartitions()).flatMap(topicPartition -> this.seekToOffsetOrPositionInPartition((TopicPartition)topicPartition, topicSubscription)).doOnComplete(() -> LOG.info("Seeking partitions is done.")).then();
        }
        return Mono.empty();
    }

    private Mono<Void> assignToPartitions(Set<TopicPartition> topicPartitions) {
        return this.wrapToMono((arg_0, arg_1) -> this.kafkaConsumer.assign(arg_0, arg_1), topicPartitions);
    }

    private <R> Mono<R> wrapResultToMono(Consumer<Handler<R>> fn) {
        return Mono.create(sink -> fn.accept(arg_0 -> ((MonoSink)sink).success(arg_0)));
    }

    private <R> Mono<R> wrapToMono(Consumer<Handler<AsyncResult<R>>> fn) {
        return Mono.create(sink -> fn.accept(result -> this.wrapAsyncResult((MonoSink)sink, (AsyncResult)result)));
    }

    private <R, V> Mono<R> wrapToMono(BiConsumer<V, Handler<AsyncResult<R>>> fn, V input) {
        return Mono.create(sink -> fn.accept(input, result -> this.wrapAsyncResult((MonoSink)sink, (AsyncResult)result)));
    }

    private <R, V1, V2> Mono<R> wrapToMono(TriConsumer<V1, V2, Handler<AsyncResult<R>>> fn, V1 input1, V2 input2) {
        return Mono.create(sink -> fn.accept(input1, input2, result -> this.wrapAsyncResult((MonoSink)sink, (AsyncResult)result)));
    }

    private <R> void wrapAsyncResult(MonoSink<R> sink, AsyncResult<R> result) {
        if (result.failed()) {
            sink.error(result.cause());
        } else {
            sink.success(result.result());
        }
    }
}

