/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.database.r2dbc;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Statement;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.database.common.QueryContext;
import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetry;

public interface R2dbcConnectionFactory {
    public Mono<Connection> currentConnection();

    public Mono<Connection> newConnection();

    public DataBaseTelemetry telemetry();

    public <T> Mono<T> inTx(Function<Connection, Mono<T>> var1);

    public <T> Mono<T> withConnection(Function<Connection, Mono<T>> var1);

    public <T> Flux<T> withConnectionFlux(Function<Connection, Flux<T>> var1);

    default public <T> Mono<T> query(QueryContext queryContext, Consumer<Statement> statementSetter, Function<Flux<Result>, Mono<T>> resultFluxConsumer) {
        return Mono.deferContextual(ctx -> {
            DataBaseTelemetry.DataBaseTelemetryContext telemetry = this.telemetry().createContext(Context.Reactor.current((ContextView)ctx), queryContext);
            return this.withConnection(connection -> {
                Statement stmt = connection.createStatement(queryContext.sql());
                statementSetter.accept(stmt);
                return (Mono)resultFluxConsumer.apply(Flux.from((Publisher)stmt.execute()));
            }).doOnEach(s -> {
                if (s.isOnComplete()) {
                    telemetry.close(null);
                } else if (s.isOnError()) {
                    telemetry.close(s.getThrowable());
                }
            });
        });
    }
}

