/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.driver.stress;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Driver;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.stress.AbstractContext;
import org.neo4j.driver.stress.AbstractRxQuery;
import org.neo4j.driver.summary.ResultSummary;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RxReadQueryWithRetries<C extends AbstractContext>
extends AbstractRxQuery<C> {
    public RxReadQueryWithRetries(Driver driver, boolean useBookmark) {
        super(driver, useBookmark);
    }

    @Override
    public CompletionStage<Void> execute(C context) {
        CompletableFuture<Void> queryFinished = new CompletableFuture<Void>();
        Flux.usingWhen((Publisher)Mono.fromSupplier(() -> this.newSession(AccessMode.READ, context)), this::processAndGetSummary, RxSession::close).subscribe(summary -> {
            queryFinished.complete(null);
            context.readCompleted((ResultSummary)summary);
        }, error -> queryFinished.complete(null));
        return queryFinished;
    }

    private Publisher<ResultSummary> processAndGetSummary(RxSession session) {
        return session.readTransaction(tx -> {
            RxResult result = tx.run("MATCH (n) RETURN n LIMIT 1");
            Mono records = Flux.from((Publisher)result.records()).singleOrEmpty().map(record -> record.get(0).asNode());
            Mono summaryMono = Mono.from((Publisher)result.consume()).single();
            return records.then(summaryMono);
        });
    }
}

