/*
 * Decompiled with CFR 0.152.
 */
package com.github.lhotari.reactive.pulsar.internal.adapter;

import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.Murmur3_32Hash;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.scheduler.Scheduler;

public class InKeyOrderMessageProcessors {
    public static int resolveProcessingGroupForMessage(Message<?> message, int numberOfGroups) {
        byte[] keyBytes = InKeyOrderMessageProcessors.getMessageKeyBytes(message);
        int keyHash = Murmur3_32Hash.getInstance().makeHash(keyBytes);
        return keyHash % numberOfGroups;
    }

    private static byte[] getMessageKeyBytes(Message<?> message) {
        byte[] keyBytes = null;
        if (message.hasOrderingKey()) {
            keyBytes = message.getOrderingKey();
        } else if (message.hasKey()) {
            keyBytes = message.getKeyBytes();
        }
        if (keyBytes == null || keyBytes.length == 0) {
            keyBytes = message.getMessageId().toByteArray();
        }
        return keyBytes;
    }

    public static <T> Flux<GroupedFlux<Integer, Message<T>>> groupByProcessingGroup(Flux<Message<T>> messageFlux, int numberOfGroups) {
        return messageFlux.groupBy(message -> InKeyOrderMessageProcessors.resolveProcessingGroupForMessage(message, numberOfGroups));
    }

    public static <T, R> Flux<R> processInKeyOrderConcurrently(Flux<Message<T>> messageFlux, Function<? super Message<T>, ? extends Publisher<? extends R>> messageHandler, Scheduler scheduler, int concurrency) {
        return InKeyOrderMessageProcessors.groupByProcessingGroup(messageFlux, concurrency).flatMap(groupedFlux -> groupedFlux.concatMap(messageHandler).subscribeOn(scheduler), concurrency);
    }

    public static <T, R> Flux<R> processInKeyOrderInParallel(Flux<Message<T>> messageFlux, Function<? super Message<T>, ? extends Publisher<? extends R>> messageHandler, Scheduler scheduler, int parallelism) {
        return InKeyOrderMessageProcessors.groupByProcessingGroup(messageFlux, parallelism).parallel(parallelism).runOn(scheduler).flatMap(groupedFlux -> groupedFlux.concatMap(messageHandler)).sequential();
    }
}

