/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.sender.internals;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaOutbound;
import reactor.kafka.sender.TransactionManager;
import reactor.kafka.sender.internals.DefaultKafkaSender;
import reactor.kafka.sender.internals.KafkaOutboundThen;

class DefaultKafkaOutbound<K, V>
implements KafkaOutbound<K, V> {
    final DefaultKafkaSender<K, V> sender;

    DefaultKafkaOutbound(DefaultKafkaSender<K, V> sender) {
        this.sender = sender;
    }

    @Override
    public KafkaOutbound<K, V> send(Publisher<? extends ProducerRecord<K, V>> records) {
        return this.then((Publisher<Void>)this.sender.doSend(records).then());
    }

    @Override
    public KafkaOutbound<K, V> sendTransactionally(Publisher<? extends Publisher<? extends ProducerRecord<K, V>>> transactionRecords) {
        return this.then((Publisher<Void>)Flux.from(transactionRecords).publishOn(this.sender.senderOptions.scheduler()).concatMapDelayError(this::transaction, false, 1));
    }

    private Mono<Void> transaction(Publisher<? extends ProducerRecord<K, V>> transactionRecords) {
        TransactionManager transactionManager = this.sender.transactionManager();
        return transactionManager.begin().thenMany(this.sender.doSend(transactionRecords)).concatWith(transactionManager.commit()).onErrorResume(e -> transactionManager.abort().then(Mono.error((Throwable)e))).publishOn(this.sender.senderOptions.scheduler()).then();
    }

    @Override
    public KafkaOutbound<K, V> then(Publisher<Void> other) {
        return new KafkaOutboundThen<K, V>(this.sender, this, other);
    }

    @Override
    public Mono<Void> then() {
        return Mono.empty();
    }
}

