/*
 * Decompiled with CFR 0.152.
 */
package com.scylladb.cdc.cql.driver3;

import com.google.common.flogger.FluentLogger;
import com.scylladb.cdc.cql.WorkerCQL;
import com.scylladb.cdc.cql.driver3.Driver3CommonCQL;
import com.scylladb.cdc.cql.driver3.Driver3RawChange;
import com.scylladb.cdc.cql.driver3.Driver3SchemaFactory;
import com.scylladb.cdc.cql.driver3.Driver3Session;
import com.scylladb.cdc.model.StreamId;
import com.scylladb.cdc.model.TableName;
import com.scylladb.cdc.model.worker.ChangeId;
import com.scylladb.cdc.model.worker.ChangeSchema;
import com.scylladb.cdc.model.worker.RawChange;
import com.scylladb.cdc.model.worker.Task;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import shaded.com.scylladb.cdc.driver3.common.base.Preconditions;
import shaded.com.scylladb.cdc.driver3.common.util.concurrent.FutureCallback;
import shaded.com.scylladb.cdc.driver3.common.util.concurrent.Futures;
import shaded.com.scylladb.cdc.driver3.common.util.concurrent.MoreExecutors;
import shaded.com.scylladb.cdc.driver3.driver.core.ConsistencyLevel;
import shaded.com.scylladb.cdc.driver3.driver.core.Metadata;
import shaded.com.scylladb.cdc.driver3.driver.core.PreparedStatement;
import shaded.com.scylladb.cdc.driver3.driver.core.RegularStatement;
import shaded.com.scylladb.cdc.driver3.driver.core.ResultSet;
import shaded.com.scylladb.cdc.driver3.driver.core.Row;
import shaded.com.scylladb.cdc.driver3.driver.core.Session;
import shaded.com.scylladb.cdc.driver3.driver.core.querybuilder.QueryBuilder;

public final class Driver3WorkerCQL
implements WorkerCQL {
    private static final FluentLogger logger = FluentLogger.forEnclosingClass();
    private final Session session;
    private final Map<TableName, PreparedStatement> preparedStmts = new HashMap<TableName, PreparedStatement>();
    private final ConsistencyLevel consistencyLevel;

    public Driver3WorkerCQL(Driver3Session session) {
        this.session = Preconditions.checkNotNull(session).getDriverSession();
        this.consistencyLevel = session.getConsistencyLevel();
    }

    private static RegularStatement getStmt(TableName table) {
        return QueryBuilder.select().all().from(Metadata.quoteIfNecessary(table.keyspace), Metadata.quoteIfNecessary(table.name + "_scylla_cdc_log")).where(QueryBuilder.eq(Metadata.quoteIfNecessary("cdc$stream_id"), QueryBuilder.bindMarker())).and(QueryBuilder.gt(Metadata.quoteIfNecessary("cdc$time"), QueryBuilder.bindMarker())).and(QueryBuilder.lte(Metadata.quoteIfNecessary("cdc$time"), QueryBuilder.bindMarker()));
    }

    private CompletableFuture<PreparedResult> prepare(final TableName table) {
        final CompletableFuture<PreparedResult> result = new CompletableFuture<PreparedResult>();
        Futures.addCallback(this.session.prepareAsync(Driver3WorkerCQL.getStmt(table)), new FutureCallback<PreparedStatement>(){

            @Override
            public void onSuccess(PreparedStatement r) {
                result.complete(new PreparedResult(table, r));
            }

            @Override
            public void onFailure(Throwable t2) {
                result.completeExceptionally(t2);
            }
        }, MoreExecutors.directExecutor());
        return result;
    }

    @Override
    public void prepare(Set<TableName> tables) throws InterruptedException, ExecutionException {
        CompletableFuture[] futures = (CompletableFuture[])tables.stream().filter(t2 -> !this.preparedStmts.containsKey(t2)).map(this::prepare).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).get();
        for (CompletableFuture f : futures) {
            PreparedResult r = (PreparedResult)f.get();
            this.preparedStmts.put(r.table, r.stmt);
        }
    }

    private CompletableFuture<WorkerCQL.Reader> query(PreparedStatement stmt, final Task task) {
        final CompletableFuture<WorkerCQL.Reader> result = new CompletableFuture<WorkerCQL.Reader>();
        List futures = task.streams.stream().map(StreamId::getValue).map(streamId -> this.session.executeAsync(stmt.bind(streamId, task.state.getWindowStart(), task.state.getWindowEnd()).setConsistencyLevel(this.consistencyLevel))).collect(Collectors.toList());
        ((FluentLogger.Api)logger.atFine()).log("Querying window: [%s, %s] for task: %s, task state: %s", (Object)task.state.getWindowStart(), (Object)task.state.getWindowEnd(), (Object)task.id, (Object)task.state);
        Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<ResultSet>>(){

            @Override
            public void onSuccess(List<ResultSet> rss) {
                result.complete(new Driver3MultiReader(rss, task.state.getLastConsumedChangeId()));
            }

            @Override
            public void onFailure(Throwable t2) {
                result.completeExceptionally(t2);
            }
        }, MoreExecutors.directExecutor());
        return result;
    }

    @Override
    public CompletableFuture<WorkerCQL.Reader> createReader(Task task) {
        PreparedStatement stmt = this.preparedStmts.get(task.id.getTable());
        Preconditions.checkNotNull(stmt);
        return this.query(stmt, task);
    }

    @Override
    public CompletableFuture<Optional<Long>> fetchTableTTL(TableName tableName) {
        return Driver3CommonCQL.fetchTableTTL(this.session, tableName);
    }

    private final class Driver3MultiReader
    implements WorkerCQL.Reader {
        private volatile List<Driver3Reader> readers;
        private AtomicInteger currentReaderIndex;

        public Driver3MultiReader(List<ResultSet> rss, Optional<ChangeId> lastChangeId) {
            this.readers = rss.stream().map(rs -> new Driver3Reader((ResultSet)rs, lastChangeId)).collect(Collectors.toList());
            this.currentReaderIndex = new AtomicInteger();
        }

        private void findNext(CompletableFuture<Optional<RawChange>> fut) {
            if (this.currentReaderIndex.get() >= this.readers.size()) {
                fut.complete(Optional.empty());
            }
            this.readers.get(this.currentReaderIndex.get()).nextChange().whenCompleteAsync((change, exception) -> {
                if (exception != null) {
                    fut.completeExceptionally((Throwable)exception);
                } else if (!change.isPresent()) {
                    this.currentReaderIndex.incrementAndGet();
                    this.findNext(fut);
                } else {
                    fut.complete((Optional<RawChange>)change);
                }
            });
        }

        @Override
        public CompletableFuture<Optional<RawChange>> nextChange() {
            CompletableFuture<Optional<RawChange>> result = new CompletableFuture<Optional<RawChange>>();
            this.findNext(result);
            return result;
        }
    }

    private final class Driver3Reader
    implements WorkerCQL.Reader {
        private volatile ResultSet rs;
        private volatile ChangeSchema schema;
        private final Optional<ChangeId> lastChangeId;

        public Driver3Reader(ResultSet rs, Optional<ChangeId> lastChangeId) {
            this.rs = Preconditions.checkNotNull(rs);
            this.lastChangeId = Preconditions.checkNotNull(lastChangeId);
        }

        private void findNext(final CompletableFuture<Optional<RawChange>> fut) {
            if (this.rs.getAvailableWithoutFetching() == 0) {
                if (this.rs.isFullyFetched()) {
                    fut.complete(Optional.empty());
                } else {
                    Futures.addCallback(this.rs.fetchMoreResults(), new FutureCallback<ResultSet>(){

                        @Override
                        public void onSuccess(ResultSet result) {
                            Driver3Reader.this.rs = result;
                            Driver3Reader.this.findNext(fut);
                        }

                        @Override
                        public void onFailure(Throwable t2) {
                            fut.completeExceptionally(t2);
                        }
                    }, MoreExecutors.directExecutor());
                }
            } else {
                Row row = (Row)this.rs.one();
                if (this.schema == null) {
                    try {
                        this.schema = Driver3SchemaFactory.getChangeSchema(row, Driver3WorkerCQL.this.session.getCluster().getMetadata());
                    }
                    catch (Driver3SchemaFactory.UnresolvableSchemaInconsistencyException ex) {
                        fut.completeExceptionally(ex);
                        return;
                    }
                }
                Driver3RawChange newChange = new Driver3RawChange(row, this.schema);
                if (!this.lastChangeId.isPresent() || newChange.getId().compareTo(this.lastChangeId.get()) > 0) {
                    fut.complete(Optional.of(newChange));
                } else {
                    this.findNext(fut);
                }
            }
        }

        @Override
        public CompletableFuture<Optional<RawChange>> nextChange() {
            CompletableFuture<Optional<RawChange>> result = new CompletableFuture<Optional<RawChange>>();
            this.findNext(result);
            return result;
        }
    }

    private static final class PreparedResult {
        public final TableName table;
        public final PreparedStatement stmt;

        public PreparedResult(TableName table, PreparedStatement stmt) {
            this.table = Preconditions.checkNotNull(table);
            this.stmt = Preconditions.checkNotNull(stmt);
        }
    }
}

