/*
 * Decompiled with CFR 0.152.
 */
package com.scylladb.cdc.cql.driver3;

import com.google.common.flogger.FluentLogger;
import com.scylladb.cdc.cql.BaseMasterCQL;
import com.scylladb.cdc.cql.driver3.Driver3CommonCQL;
import com.scylladb.cdc.cql.driver3.Driver3Session;
import com.scylladb.cdc.cql.driver3.FutureUtils;
import com.scylladb.cdc.model.TableName;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import shaded.com.scylladb.cdc.driver3.common.base.Preconditions;
import shaded.com.scylladb.cdc.driver3.common.util.concurrent.FutureCallback;
import shaded.com.scylladb.cdc.driver3.common.util.concurrent.Futures;
import shaded.com.scylladb.cdc.driver3.common.util.concurrent.ListenableFuture;
import shaded.com.scylladb.cdc.driver3.common.util.concurrent.MoreExecutors;
import shaded.com.scylladb.cdc.driver3.driver.core.ConsistencyLevel;
import shaded.com.scylladb.cdc.driver3.driver.core.KeyspaceMetadata;
import shaded.com.scylladb.cdc.driver3.driver.core.PreparedStatement;
import shaded.com.scylladb.cdc.driver3.driver.core.ResultSet;
import shaded.com.scylladb.cdc.driver3.driver.core.ResultSetFuture;
import shaded.com.scylladb.cdc.driver3.driver.core.Row;
import shaded.com.scylladb.cdc.driver3.driver.core.Session;
import shaded.com.scylladb.cdc.driver3.driver.core.Statement;
import shaded.com.scylladb.cdc.driver3.driver.core.TableMetadata;
import shaded.com.scylladb.cdc.driver3.driver.core.querybuilder.QueryBuilder;

public final class Driver3MasterCQL
extends BaseMasterCQL {
    private static final FluentLogger logger = FluentLogger.forEnclosingClass();
    private final Session session;
    private PreparedStatement fetchSmallestGenerationAfterStmt;
    private PreparedStatement fetchStreamsStmt;
    private boolean foundRewritten = false;
    private PreparedStatement legacyFetchSmallestGenerationAfterStmt;
    private PreparedStatement legacyFetchStreamsStmt;

    public Driver3MasterCQL(Driver3Session session) {
        this.session = Preconditions.checkNotNull(session).getDriverSession();
    }

    private CompletableFuture<Boolean> fetchShouldQueryLegacyTables() {
        boolean hasLegacyTables;
        boolean hasNewTables = this.session.getCluster().getMetadata().getKeyspace("system_distributed").getTable("cdc_generation_timestamps") != null;
        boolean bl = hasLegacyTables = this.session.getCluster().getMetadata().getKeyspace("system_distributed").getTable("cdc_streams_descriptions") != null;
        if (hasLegacyTables && !hasNewTables) {
            ((FluentLogger.Api)logger.atFine()).log("Using legacy (V1) streams description table, as a newer (V2) table was not found.");
            return CompletableFuture.completedFuture(true);
        }
        if (!hasLegacyTables && hasNewTables) {
            ((FluentLogger.Api)logger.atFine()).log("Using new (V2) streams description table, as a legacy (V1) table was not found.");
            return CompletableFuture.completedFuture(false);
        }
        if (!hasLegacyTables && !hasNewTables) {
            CompletableFuture<Boolean> exceptionalFuture = new CompletableFuture<Boolean>();
            exceptionalFuture.completeExceptionally(new IllegalStateException("Could not find any Scylla CDC stream description tables (either streams description table V1 or V2). Make sure you have Scylla CDC enabled."));
            return exceptionalFuture;
        }
        if (this.foundRewritten) {
            ((FluentLogger.Api)logger.atFiner()).log("Using new (V2) streams description table, because a 'rewritten' row was found previously.");
            return CompletableFuture.completedFuture(false);
        }
        return this.executeOne(this.getFetchRewritten()).thenApply(fetchedRewritten -> {
            if (fetchedRewritten.isPresent()) {
                this.foundRewritten = true;
                ((FluentLogger.Api)logger.atInfo()).log("Found a 'rewritten' row. Will use new (V2) streams description table from now on.");
                return false;
            }
            ((FluentLogger.Api)logger.atWarning()).log("Using legacy (V1) streams description table, even though newer (V2) table was found, but a 'rewritten' row is still missing. This might mean that the rewriting process is still pending or you have disabled streams description rewriting - in that case the library will not switch to the new (V2) table until it discovers a 'rewritten' row. Read more at: https://github.com/scylladb/scylla/blob/master/docs/design-notes/cdc.md#streams-description-table-v1-and-rewriting");
            return true;
        });
    }

    private CompletableFuture<PreparedStatement> getLegacyFetchSmallestGenerationAfter() {
        if (this.legacyFetchSmallestGenerationAfterStmt != null) {
            return CompletableFuture.completedFuture(this.legacyFetchSmallestGenerationAfterStmt);
        }
        ListenableFuture<PreparedStatement> prepareStatement = this.session.prepareAsync(QueryBuilder.select().min(QueryBuilder.column("time")).from("system_distributed", "cdc_streams_descriptions").where(QueryBuilder.gt("time", QueryBuilder.bindMarker())).allowFiltering());
        return FutureUtils.convert(prepareStatement).thenApply(preparedStatement -> {
            this.legacyFetchSmallestGenerationAfterStmt = preparedStatement;
            return preparedStatement;
        });
    }

    private CompletableFuture<PreparedStatement> getFetchSmallestGenerationAfter() {
        if (this.fetchSmallestGenerationAfterStmt != null) {
            return CompletableFuture.completedFuture(this.fetchSmallestGenerationAfterStmt);
        }
        ListenableFuture<PreparedStatement> prepareStatement = this.session.prepareAsync(QueryBuilder.select().min(QueryBuilder.column("time")).from("system_distributed", "cdc_generation_timestamps").where(QueryBuilder.eq("key", "timestamps")).and(QueryBuilder.gt("time", QueryBuilder.bindMarker())));
        return FutureUtils.convert(prepareStatement).thenApply(preparedStatement -> {
            this.fetchSmallestGenerationAfterStmt = preparedStatement;
            return preparedStatement;
        });
    }

    private CompletableFuture<PreparedStatement> getLegacyFetchStreams() {
        if (this.legacyFetchStreamsStmt != null) {
            return CompletableFuture.completedFuture(this.legacyFetchStreamsStmt);
        }
        ListenableFuture<PreparedStatement> prepareStatement = this.session.prepareAsync(QueryBuilder.select().column("streams").from("system_distributed", "cdc_streams_descriptions").where(QueryBuilder.eq("time", QueryBuilder.bindMarker())).allowFiltering());
        return FutureUtils.convert(prepareStatement).thenApply(preparedStatement -> {
            this.legacyFetchStreamsStmt = preparedStatement;
            return preparedStatement;
        });
    }

    private CompletableFuture<PreparedStatement> getFetchStreams() {
        if (this.fetchStreamsStmt != null) {
            return CompletableFuture.completedFuture(this.fetchStreamsStmt);
        }
        ListenableFuture<PreparedStatement> prepareStatement = this.session.prepareAsync(QueryBuilder.select().column("streams").from("system_distributed", "cdc_streams_descriptions_v2").where(QueryBuilder.eq("time", QueryBuilder.bindMarker())));
        return FutureUtils.convert(prepareStatement).thenApply(preparedStatement -> {
            this.fetchStreamsStmt = preparedStatement;
            return preparedStatement;
        });
    }

    private Statement getFetchRewritten() {
        return QueryBuilder.select().from("system", "cdc_local").where(QueryBuilder.eq("key", "rewritten"));
    }

    private ConsistencyLevel computeCL() {
        return this.session.getCluster().getMetadata().getAllHosts().size() > 1 ? ConsistencyLevel.QUORUM : ConsistencyLevel.ONE;
    }

    private void consumeOneResult(ResultSet rs, final CompletableFuture<Optional<Row>> result) {
        int availCount = rs.getAvailableWithoutFetching();
        if (availCount == 0) {
            if (rs.isFullyFetched()) {
                result.complete(Optional.empty());
            } else {
                Futures.addCallback(rs.fetchMoreResults(), new FutureCallback<ResultSet>(){

                    @Override
                    public void onSuccess(ResultSet rs) {
                        Driver3MasterCQL.this.consumeOneResult(rs, result);
                    }

                    @Override
                    public void onFailure(Throwable t2) {
                        result.completeExceptionally(t2);
                    }
                }, MoreExecutors.directExecutor());
            }
        } else {
            assert (availCount == 1);
            result.complete(Optional.of((Row)rs.one()));
        }
    }

    private void consumeManyResults(ResultSet rs, final Collection<Row> alreadyFetched, final CompletableFuture<Collection<Row>> result) {
        int availableWithoutFetching = rs.getAvailableWithoutFetching();
        if (availableWithoutFetching == 0) {
            if (rs.isFullyFetched()) {
                result.complete(alreadyFetched);
            } else {
                Futures.addCallback(rs.fetchMoreResults(), new FutureCallback<ResultSet>(){

                    @Override
                    public void onSuccess(ResultSet rsNew) {
                        Driver3MasterCQL.this.consumeManyResults(rsNew, alreadyFetched, result);
                    }

                    @Override
                    public void onFailure(Throwable t2) {
                        result.completeExceptionally(t2);
                    }
                }, MoreExecutors.directExecutor());
            }
        } else {
            for (int i = 0; i < availableWithoutFetching; ++i) {
                alreadyFetched.add((Row)rs.one());
            }
            this.consumeManyResults(rs, alreadyFetched, result);
        }
    }

    private CompletableFuture<Optional<Row>> executeOne(Statement stmt) {
        final CompletableFuture<Optional<Row>> result = new CompletableFuture<Optional<Row>>();
        ResultSetFuture future = this.session.executeAsync(stmt.setConsistencyLevel(this.computeCL()));
        Futures.addCallback(future, new FutureCallback<ResultSet>(){

            @Override
            public void onSuccess(ResultSet rs) {
                Driver3MasterCQL.this.consumeOneResult(rs, result);
            }

            @Override
            public void onFailure(Throwable t2) {
                result.completeExceptionally(t2);
            }
        }, MoreExecutors.directExecutor());
        return result;
    }

    private CompletableFuture<Collection<Row>> executeMany(Statement stmt) {
        final CompletableFuture<Collection<Row>> result = new CompletableFuture<Collection<Row>>();
        ResultSetFuture future = this.session.executeAsync(stmt.setConsistencyLevel(this.computeCL()));
        Futures.addCallback(future, new FutureCallback<ResultSet>(){

            @Override
            public void onSuccess(ResultSet rs) {
                Driver3MasterCQL.this.consumeManyResults(rs, new ArrayList<Row>(), result);
            }

            @Override
            public void onFailure(Throwable t2) {
                result.completeExceptionally(t2);
            }
        }, MoreExecutors.directExecutor());
        return result;
    }

    @Override
    protected CompletableFuture<Optional<Date>> fetchSmallestGenerationAfter(Date after) {
        return this.fetchShouldQueryLegacyTables().thenCompose(shouldQueryLegacyTables -> {
            if (shouldQueryLegacyTables.booleanValue()) {
                return this.getLegacyFetchSmallestGenerationAfter().thenCompose(statement -> this.executeOne(statement.bind(after)).thenApply(o -> o.map(r -> r.getTimestamp(0))));
            }
            return this.getFetchSmallestGenerationAfter().thenCompose(statement -> this.executeOne(statement.bind(after)).thenApply(o -> o.map(r -> r.getTimestamp(0))));
        });
    }

    @Override
    protected CompletableFuture<Set<ByteBuffer>> fetchStreamsForGeneration(Date generationStart) {
        return this.fetchShouldQueryLegacyTables().thenCompose(shouldQueryLegacyTables -> {
            if (shouldQueryLegacyTables.booleanValue()) {
                return this.getLegacyFetchStreams().thenCompose(statement -> this.executeOne(statement.bind(generationStart)).thenApply(o -> ((Row)o.get()).getSet(0, ByteBuffer.class)));
            }
            return this.getFetchStreams().thenCompose(statement -> this.executeMany(statement.bind(generationStart)).thenApply(o -> o.stream().flatMap(r -> r.getSet(0, ByteBuffer.class).stream()).collect(Collectors.toSet())));
        });
    }

    @Override
    public CompletableFuture<Optional<Long>> fetchTableTTL(TableName tableName) {
        return Driver3CommonCQL.fetchTableTTL(this.session, tableName);
    }

    @Override
    public CompletableFuture<Optional<Throwable>> validateTable(TableName table) {
        KeyspaceMetadata keyspaceMetadata = this.session.getCluster().getMetadata().getKeyspace(table.keyspace);
        if (keyspaceMetadata == null) {
            return CompletableFuture.completedFuture(Optional.of(new IllegalArgumentException(String.format("Did not find table '%s.%s' in Scylla cluster - missing keyspace '%s'.", table.keyspace, table.name, table.keyspace))));
        }
        TableMetadata tableMetadata = keyspaceMetadata.getTable(table.name);
        if (tableMetadata == null) {
            return CompletableFuture.completedFuture(Optional.of(new IllegalArgumentException(String.format("Did not find table '%s.%s' in Scylla cluster.", table.keyspace, table.name))));
        }
        if (!tableMetadata.getOptions().isScyllaCDC()) {
            return CompletableFuture.completedFuture(Optional.of(new IllegalArgumentException(String.format("The table '%s.%s' does not have Scylla CDC enabled.", table.keyspace, table.name))));
        }
        return CompletableFuture.completedFuture(Optional.empty());
    }
}

