/*
 * Decompiled with CFR 0.152.
 */
package com.salesforce.datacloud.jdbc.core.listener;

import com.salesforce.datacloud.jdbc.core.DataCloudResultSet;
import com.salesforce.datacloud.jdbc.core.HyperGrpcClientExecutor;
import com.salesforce.datacloud.jdbc.core.StreamingResultSet;
import com.salesforce.datacloud.jdbc.core.listener.AdaptiveQueryStatusPoller;
import com.salesforce.datacloud.jdbc.core.listener.AsyncQueryStatusPoller;
import com.salesforce.datacloud.jdbc.core.listener.QueryStatusListener;
import com.salesforce.datacloud.jdbc.core.listener.QueryStatusPoller;
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
import com.salesforce.datacloud.jdbc.exception.QueryExceptionHandler;
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
import com.salesforce.datacloud.jdbc.util.ThrowingSupplier;
import com.salesforce.hyperdb.grpc.ExecuteQueryResponse;
import com.salesforce.hyperdb.grpc.QueryResult;
import com.salesforce.hyperdb.grpc.QueryStatus;
import io.grpc.StatusRuntimeException;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdaptiveQueryStatusListener
implements QueryStatusListener {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AdaptiveQueryStatusListener.class);
    private final String queryId;
    private final String query;
    private final HyperGrpcClientExecutor client;
    private final Duration timeout;
    private final Iterator<ExecuteQueryResponse> response;
    private final AdaptiveQueryStatusPoller headPoller;
    private final AsyncQueryStatusPoller tailPoller;

    public static AdaptiveQueryStatusListener of(String query, HyperGrpcClientExecutor client, Duration timeout) throws SQLException {
        try {
            Iterator<ExecuteQueryResponse> response = client.executeAdaptiveQuery(query);
            String queryId = response.next().getQueryInfo().getQueryStatus().getQueryId();
            return new AdaptiveQueryStatusListener(queryId, query, client, timeout, response, new AdaptiveQueryStatusPoller(queryId, client), new AsyncQueryStatusPoller(queryId, client));
        }
        catch (StatusRuntimeException ex) {
            throw QueryExceptionHandler.createException("Failed to execute query: " + query, (Exception)((Object)ex));
        }
    }

    @Override
    public boolean isReady() {
        return true;
    }

    @Override
    public String getStatus() {
        QueryStatusPoller poller = this.headPoller.pollChunkCount() > 1L ? this.tailPoller : this.headPoller;
        return Optional.of(poller).map(QueryStatusPoller::pollQueryStatus).map(QueryStatus::getCompletionStatus).map(Enum::name).orElse(QueryStatus.CompletionStatus.RUNNING.name());
    }

    @Override
    public DataCloudResultSet generateResultSet() {
        return StreamingResultSet.of(this.query, this);
    }

    @Override
    public Stream<QueryResult> stream() throws SQLException {
        return Stream.of(this::head, ThrowingSupplier.rethrowSupplier(this::tail)).flatMap(Supplier::get);
    }

    private Stream<QueryResult> head() {
        return StreamUtilities.toStream(this.response).map(this.headPoller::map).filter(Optional::isPresent).map(Optional::get);
    }

    private Stream<QueryResult> tail() throws SQLException {
        return StreamUtilities.lazyLimitedStream(this::infiniteChunks, ThrowingSupplier.rethrowLongSupplier(this::getChunkLimit)).flatMap(UnaryOperator.identity());
    }

    private Stream<Stream<QueryResult>> infiniteChunks() {
        return LongStream.iterate(1L, n -> n + 1L).mapToObj(this::tryGetQueryResult);
    }

    private long getChunkLimit() throws SQLException {
        if (this.headPoller.pollChunkCount() > 1L) {
            this.blockUntilReady(this.tailPoller, this.timeout);
            return this.tailPoller.pollChunkCount() - 1L;
        }
        return 0L;
    }

    private Stream<QueryResult> tryGetQueryResult(long chunkId) {
        return StreamUtilities.tryTimes(3, () -> this.client.getQueryResult(this.queryId, chunkId, true), throwable -> log.warn("Error when getting chunk for query. queryId={}, chunkId={}", new Object[]{this.queryId, chunkId, throwable})).map(StreamUtilities::toStream).orElse(Stream.empty());
    }

    private void blockUntilReady(QueryStatusPoller poller, Duration timeout) {
        Instant end = Instant.now().plus(timeout);
        int millis = 1000;
        while (!poller.pollIsReady() && Instant.now().isBefore(end)) {
            log.info("Waiting for additional query results. queryId={}, timeout={}, sleep={}", new Object[]{this.queryId, timeout, Duration.ofSeconds(millis)});
            Thread.sleep(millis);
            millis *= 2;
        }
        if (!this.tailPoller.pollIsReady()) {
            throw new DataCloudJDBCException("Results were requested before ready. queryId=" + this.queryId + ", timeout=" + timeout);
        }
    }

    @Generated
    private AdaptiveQueryStatusListener(String queryId, String query, HyperGrpcClientExecutor client, Duration timeout, Iterator<ExecuteQueryResponse> response, AdaptiveQueryStatusPoller headPoller, AsyncQueryStatusPoller tailPoller) {
        this.queryId = queryId;
        this.query = query;
        this.client = client;
        this.timeout = timeout;
        this.response = response;
        this.headPoller = headPoller;
        this.tailPoller = tailPoller;
    }

    @Override
    @Generated
    public String getQueryId() {
        return this.queryId;
    }

    @Override
    @Generated
    public String getQuery() {
        return this.query;
    }
}

