/*
 * Decompiled with CFR 0.152.
 */
package org.hswebframework.ezorm.rdb.executor.jdbc;

import java.sql.Connection;
import java.util.function.Function;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.executor.jdbc.JdbcSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

public abstract class JdbcReactiveSqlExecutor
extends JdbcSqlExecutor
implements ReactiveSqlExecutor {
    private static final Logger log = LoggerFactory.getLogger(JdbcReactiveSqlExecutor.class);

    public JdbcReactiveSqlExecutor() {
        super(log);
    }

    @Deprecated
    public abstract Mono<Connection> getConnection();

    protected <T> Flux<T> doInConnection(Function<Connection, Publisher<T>> handler) {
        return this.getConnection().flatMapMany(handler);
    }

    @Override
    public Mono<Integer> update(Publisher<SqlRequest> request) {
        return Flux.deferContextual(ctx -> this.doInConnection(connection -> this.toFlux(request).map(sql -> this.doUpdate((Logger)ctx.getOrDefault(Logger.class, (Object)log), (Connection)connection, (SqlRequest)sql)).reduce(Math::addExact))).last((Object)0).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.parallel());
    }

    @Override
    public Mono<Void> execute(Publisher<SqlRequest> request) {
        return Flux.deferContextual(ctx -> this.doInConnection(connection -> this.toFlux(request).doOnNext(sql -> this.doExecute((Logger)ctx.getOrDefault(Logger.class, (Object)log), (Connection)connection, (SqlRequest)sql)))).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.parallel()).then();
    }

    @Override
    public <E> Flux<E> select(Publisher<SqlRequest> request, ResultWrapper<E, ?> wrapper) {
        return Flux.deferContextual(ctx -> {
            Logger logger = (Logger)ctx.getOrDefault(Logger.class, (Object)log);
            return Flux.create(sink -> {
                Disposable.Composite disposable = Disposables.composite();
                Disposable queryDisposable = this.doInConnection(connection -> this.toFlux(request).doOnNext(sql -> this.doSelect(logger, (Connection)connection, (SqlRequest)sql, ResultWrappers.consumer(wrapper, arg_0 -> ((FluxSink)sink).next(arg_0)), disposable)).then()).subscribeOn(Schedulers.boundedElastic()).subscribe(ignore -> sink.complete(), arg_0 -> ((FluxSink)sink).error(arg_0), () -> ((FluxSink)sink).complete(), Context.of((ContextView)sink.contextView()));
                disposable.add(queryDisposable);
                sink.onDispose((Disposable)disposable);
            });
        }).publishOn(Schedulers.parallel());
    }

    protected Flux<SqlRequest> toFlux(Publisher<SqlRequest> request) {
        return Flux.from(request);
    }
}

