/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.mongodb;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Ints;
import io.opentelemetry.api.trace.Span;
import io.trino.Session;
import io.trino.execution.QueryStats;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.TableHandle;
import io.trino.operator.OperatorStats;
import io.trino.plugin.mongodb.MongoQueryRunner;
import io.trino.plugin.mongodb.MongoServer;
import io.trino.security.AccessControl;
import io.trino.security.AllowAllAccessControl;
import io.trino.spi.QueryId;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.predicate.TupleDomain;
import io.trino.split.SplitSource;
import io.trino.sql.planner.OptimizerConfig;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.tpch.TpchTable;
import io.trino.transaction.TransactionId;
import io.trino.transaction.TransactionManager;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.assertj.core.api.Assertions;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestMongoDynamicFiltering
extends AbstractTestQueryFramework {
    protected QueryRunner createQueryRunner() throws Exception {
        MongoServer server = (MongoServer)this.closeAfterClass(new MongoServer());
        return MongoQueryRunner.builder(server).addConnectorProperties(Map.of("mongodb.dynamic-filtering.wait-timeout", "1h")).setInitialTables(List.of(TpchTable.LINE_ITEM, TpchTable.ORDERS, TpchTable.PART)).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=30L)
    public void testIncompleteDynamicFilterTimeout() throws Exception {
        QueryRunner runner = this.getQueryRunner();
        TransactionManager transactionManager = runner.getTransactionManager();
        TransactionId transactionId = transactionManager.beginTransaction(false);
        Session session = Session.builder((Session)this.getSession()).setCatalogSessionProperty("mongodb", "dynamic_filtering_wait_timeout", "1s").build().beginTransactionId(transactionId, transactionManager, (AccessControl)new AllowAllAccessControl());
        QualifiedObjectName tableName = new QualifiedObjectName("mongodb", "tpch", "orders");
        Optional tableHandle = runner.getPlannerContext().getMetadata().getTableHandle(session, tableName);
        Assertions.assertThat((Optional)tableHandle).isPresent();
        CompletableFuture<Object> dynamicFilterBlocked = new CompletableFuture<Object>();
        try {
            SplitSource splitSource = runner.getSplitManager().getSplits(session, Span.getInvalid(), (TableHandle)tableHandle.get(), (DynamicFilter)new BlockedDynamicFilter(dynamicFilterBlocked), Constraint.alwaysTrue());
            ArrayList splits = new ArrayList();
            while (!splitSource.isFinished()) {
                splits.addAll(((SplitSource.SplitBatch)splitSource.getNextBatch(1000).get()).getSplits());
            }
            splitSource.close();
            Assertions.assertThat(splits).isNotEmpty();
        }
        finally {
            dynamicFilterBlocked.complete(null);
        }
    }

    @Test
    public void testJoinDynamicFilteringSingleValue() {
        this.assertDynamicFiltering("SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.comment = 'nstructions sleep furiously among '", this.withBroadcastJoin(), 6, 6);
    }

    @Test
    public void testJoinDynamicFilteringBlockProbeSide() {
        this.assertDynamicFiltering("SELECT l.comment FROM  lineitem l, orders o WHERE l.orderkey = o.orderkey AND o.comment = 'nstructions sleep furiously among '", this.withBroadcastJoinNonReordering(), 6, 6);
    }

    private void assertDynamicFiltering(@Language(value="SQL") String selectQuery, Session session, int expectedRowCount, int ... expectedOperatorRowsRead) {
        DistributedQueryRunner runner = this.getDistributedQueryRunner();
        QueryRunner.MaterializedResultWithPlan result = runner.executeWithPlan(session, selectQuery);
        Assertions.assertThat((int)result.result().getRowCount()).isEqualTo(expectedRowCount);
        Assertions.assertThat(TestMongoDynamicFiltering.getOperatorRowsRead((QueryRunner)runner, result.queryId())).isEqualTo((Object)Ints.asList((int[])expectedOperatorRowsRead));
    }

    private Session withBroadcastJoin() {
        return Session.builder((Session)this.getSession()).setSystemProperty("join_distribution_type", OptimizerConfig.JoinDistributionType.BROADCAST.name()).build();
    }

    private Session withBroadcastJoinNonReordering() {
        return Session.builder((Session)this.getSession()).setSystemProperty("join_distribution_type", OptimizerConfig.JoinDistributionType.BROADCAST.name()).setSystemProperty("join_reordering_strategy", OptimizerConfig.JoinReorderingStrategy.NONE.name()).build();
    }

    private static List<Integer> getOperatorRowsRead(QueryRunner runner, QueryId queryId) {
        QueryStats stats = runner.getCoordinator().getQueryManager().getFullQueryInfo(queryId).getQueryStats();
        return (List)stats.getOperatorSummaries().stream().filter(summary -> summary.getDynamicFilterSplitsProcessed() > 0L).map(OperatorStats::getInputPositions).map(Math::toIntExact).collect(ImmutableList.toImmutableList());
    }

    private static class BlockedDynamicFilter
    implements DynamicFilter {
        private final CompletableFuture<?> isBlocked;

        public BlockedDynamicFilter(CompletableFuture<?> isBlocked) {
            this.isBlocked = Objects.requireNonNull(isBlocked, "isBlocked is null");
        }

        public Set<ColumnHandle> getColumnsCovered() {
            return ImmutableSet.of();
        }

        public CompletableFuture<?> isBlocked() {
            return this.isBlocked;
        }

        public boolean isComplete() {
            return false;
        }

        public boolean isAwaitable() {
            return true;
        }

        public TupleDomain<ColumnHandle> getCurrentPredicate() {
            return TupleDomain.all();
        }
    }
}

