/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator.join.unspilled;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import io.airlift.concurrent.Threads;
import io.airlift.units.DataSize;
import io.trino.ExceededMemoryLimitException;
import io.trino.RowPagesBuilder;
import io.trino.Session;
import io.trino.SessionTestUtils;
import io.trino.connector.CatalogServiceProvider;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.scheduler.NodeScheduler;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.execution.scheduler.NodeSelectorFactory;
import io.trino.execution.scheduler.UniformNodeSelectorFactory;
import io.trino.metadata.InMemoryNodeManager;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.operator.DriverContext;
import io.trino.operator.Operator;
import io.trino.operator.OperatorAssertion;
import io.trino.operator.OperatorContext;
import io.trino.operator.OperatorFactories;
import io.trino.operator.OperatorFactory;
import io.trino.operator.ProcessorContext;
import io.trino.operator.TaskContext;
import io.trino.operator.TrinoOperatorFactories;
import io.trino.operator.WorkProcessor;
import io.trino.operator.WorkProcessorOperator;
import io.trino.operator.WorkProcessorOperatorFactory;
import io.trino.operator.join.JoinBridgeManager;
import io.trino.operator.join.unspilled.JoinTestUtils;
import io.trino.operator.join.unspilled.PartitionedLookupSourceFactory;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.LazyBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import io.trino.spi.type.VarcharType;
import io.trino.sql.planner.NodePartitioningManager;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.DataProviders;
import io.trino.testing.MaterializedResult;
import io.trino.testing.TestingTaskContext;
import io.trino.type.BlockTypeOperators;
import io.trino.util.FinalizerService;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestHashJoinOperator {
    private static final BlockTypeOperators TYPE_OPERATOR_FACTORY = new BlockTypeOperators(new TypeOperators());
    private final OperatorFactories operatorFactories;
    private ExecutorService executor;
    private ScheduledExecutorService scheduledExecutor;
    private NodePartitioningManager nodePartitioningManager;

    public TestHashJoinOperator() {
        this((OperatorFactories)new TrinoOperatorFactories());
    }

    protected TestHashJoinOperator(OperatorFactories operatorFactories) {
        this.operatorFactories = Objects.requireNonNull(operatorFactories, "operatorFactories is null");
    }

    @BeforeMethod
    public void setUp() {
        this.executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.daemonThreadsNamed((String)"test-executor-%s"), new ThreadPoolExecutor.DiscardPolicy());
        this.scheduledExecutor = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed((String)(this.getClass().getSimpleName() + "-scheduledExecutor-%s")));
        NodeScheduler nodeScheduler = new NodeScheduler((NodeSelectorFactory)new UniformNodeSelectorFactory((InternalNodeManager)new InMemoryNodeManager(new InternalNode[0]), new NodeSchedulerConfig().setIncludeCoordinator(true), new NodeTaskMap(new FinalizerService())));
        this.nodePartitioningManager = new NodePartitioningManager(nodeScheduler, new BlockTypeOperators(new TypeOperators()), CatalogServiceProvider.fail());
    }

    @AfterMethod(alwaysRun=true)
    public void tearDown() {
        this.executor.shutdownNow();
        this.scheduledExecutor.shutdownNow();
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testInnerJoin(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT)).addSequencePage(10, 20, 30, 40);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty());
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT));
        List<Page> probeInput = probePages.addSequencePage(1000, 0, 1000, 2000).build();
        OperatorFactory joinOperatorFactory = JoinTestUtils.innerJoinOperatorFactory(this.operatorFactories, lookupSourceFactory, probePages, false);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probePages.getTypesWithoutHash(), buildPages.getTypesWithoutHash())).row(new Object[]{"20", 1020L, 2020L, "20", 30L, 40L}).row(new Object[]{"21", 1021L, 2021L, "21", 31L, 41L}).row(new Object[]{"22", 1022L, 2022L, "22", 32L, 42L}).row(new Object[]{"23", 1023L, 2023L, "23", 33L, 43L}).row(new Object[]{"24", 1024L, 2024L, "24", 34L, 44L}).row(new Object[]{"25", 1025L, 2025L, "25", 35L, 45L}).row(new Object[]{"26", 1026L, 2026L, "26", 36L, 46L}).row(new Object[]{"27", 1027L, 2027L, "27", 37L, 47L}).row(new Object[]{"28", 1028L, 2028L, "28", 38L, 48L}).row(new Object[]{"29", 1029L, 2029L, "29", 39L, 49L}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="singleBigintLookupSourceProvider")
    public void testInnerJoinWithRunLengthEncodedProbe(boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(false, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)BigintType.BIGINT)).addSequencePage(10, 20).addSequencePage(10, 21);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, false, taskContext, buildPages, Optional.empty(), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(false, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)BigintType.BIGINT));
        ImmutableList probeInput = ImmutableList.of((Object)new Page(new Block[]{RunLengthEncodedBlock.create((Type)BigintType.BIGINT, (Object)20L, (int)2)}), (Object)new Page(new Block[]{RunLengthEncodedBlock.create((Type)BigintType.BIGINT, (Object)-1L, (int)2)}), (Object)new Page(new Block[]{RunLengthEncodedBlock.create((Type)BigintType.BIGINT, (Object)21L, (int)2)}));
        OperatorFactory joinOperatorFactory = JoinTestUtils.innerJoinOperatorFactory(this.operatorFactories, lookupSourceFactory, probePages, false);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probePages.getTypesWithoutHash(), buildPages.getTypesWithoutHash())).row(new Object[]{20L, 20L}).row(new Object[]{20L, 20L}).row(new Object[]{21L, 21L}).row(new Object[]{21L, 21L}).row(new Object[]{21L, 21L}).row(new Object[]{21L, 21L}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), (List<Page>)probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="singleBigintLookupSourceProvider")
    public void testUnwrapsLazyBlocks(boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        DriverContext driverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();
        JoinTestUtils.TestInternalJoinFilterFunction filterFunction = new JoinTestUtils.TestInternalJoinFilterFunction((leftPosition, leftPage, rightPosition, rightPage) -> {
            rightPage.getBlock(1).getLoadedBlock();
            return true;
        });
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(false, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)BigintType.BIGINT)).addSequencePage(1, 0);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, true, taskContext, buildPages, Optional.of(filterFunction), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(false, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)BigintType.BIGINT, (Object)BigintType.BIGINT));
        List probeInput = probePages.addSequencePage(1, 0, 0).build();
        probeInput = (List)probeInput.stream().map(page -> new Page(new Block[]{page.getBlock(0), new LazyBlock(1, () -> page.getBlock(1))})).collect(ImmutableList.toImmutableList());
        OperatorFactory joinOperatorFactory = this.operatorFactories.join(OperatorFactories.JoinOperatorType.innerJoin((boolean)false, (boolean)false), 0, new PlanNodeId("test"), lookupSourceFactory, true, probePages.getTypes(), Ints.asList((int[])new int[]{0}), JoinTestUtils.getHashChannelAsInt(probePages), Optional.empty(), TYPE_OPERATOR_FACTORY);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        Operator operator = joinOperatorFactory.createOperator(driverContext);
        Assert.assertTrue((boolean)operator.needsInput());
        operator.addInput((Page)probeInput.get(0));
        operator.finish();
        Page output = operator.getOutput();
        Assert.assertFalse((boolean)(output.getBlock(1) instanceof LazyBlock));
    }

    @Test(dataProvider="singleBigintLookupSourceProvider")
    public void testYield(boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        DriverContext driverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();
        AtomicInteger filterFunctionCalls = new AtomicInteger();
        JoinTestUtils.TestInternalJoinFilterFunction filterFunction = new JoinTestUtils.TestInternalJoinFilterFunction((leftPosition, leftPage, rightPosition, rightPage) -> {
            filterFunctionCalls.incrementAndGet();
            driverContext.getYieldSignal().forceYieldForTesting();
            return true;
        });
        int entries = 40;
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(true, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)BigintType.BIGINT)).addSequencePage(entries, 42);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, true, taskContext, buildPages, Optional.of(filterFunction), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(false, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)BigintType.BIGINT));
        List<Page> probeInput = probePages.addSequencePage(100, 0).build();
        OperatorFactory joinOperatorFactory = this.operatorFactories.join(OperatorFactories.JoinOperatorType.innerJoin((boolean)false, (boolean)false), 0, new PlanNodeId("test"), lookupSourceFactory, true, probePages.getTypes(), Ints.asList((int[])new int[]{0}), JoinTestUtils.getHashChannelAsInt(probePages), Optional.empty(), TYPE_OPERATOR_FACTORY);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        Operator operator = joinOperatorFactory.createOperator(driverContext);
        Assert.assertTrue((boolean)operator.needsInput());
        operator.addInput(probeInput.get(0));
        operator.finish();
        for (int i = 0; i < entries; ++i) {
            driverContext.getYieldSignal().setWithDelay(5L * TimeUnit.SECONDS.toNanos(1L), driverContext.getYieldExecutor());
            filterFunctionCalls.set(0);
            Assert.assertNull((Object)operator.getOutput());
            Assert.assertEquals((int)filterFunctionCalls.get(), (int)1, (String)"Expected join to stop processing (yield) after calling filter function once");
            driverContext.getYieldSignal().reset();
        }
        driverContext.getYieldSignal().setWithDelay(5L * TimeUnit.SECONDS.toNanos(1L), driverContext.getYieldExecutor());
        Page output = null;
        for (int i = 0; output == null && i < 5; ++i) {
            output = operator.getOutput();
        }
        Assert.assertNotNull(output);
        driverContext.getYieldSignal().reset();
        Assert.assertEquals((int)output.getPositionCount(), (int)entries);
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testInnerJoinWithNullProbe(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes).row(1L).row(2L).row(3L);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty(), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row(1L).row(new Object[]{null}).row(new Object[]{null}).row(1L).row(2L).build();
        OperatorFactory joinOperatorFactory = JoinTestUtils.innerJoinOperatorFactory(this.operatorFactories, lookupSourceFactory, probePages, false);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildPages.getTypesWithoutHash())).row(new Object[]{1L, 1L}).row(new Object[]{1L, 1L}).row(new Object[]{2L, 2L}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testInnerJoinWithOutputSingleMatch(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes).row(1L).row(1L).row(2L);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty(), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row(1L).row(2L).row(3L).build();
        OperatorFactory joinOperatorFactory = JoinTestUtils.innerJoinOperatorFactory(this.operatorFactories, lookupSourceFactory, probePages, true, false);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{1L, 1L}).row(new Object[]{2L, 2L}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testInnerJoinWithNullBuild(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes).row(1L).row(new Object[]{null}).row(new Object[]{null}).row(1L).row(2L);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty());
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row(1L).row(2L).row(3L).build();
        OperatorFactory joinOperatorFactory = JoinTestUtils.innerJoinOperatorFactory(this.operatorFactories, lookupSourceFactory, probePages, false);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{1L, 1L}).row(new Object[]{1L, 1L}).row(new Object[]{2L, 2L}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testInnerJoinWithNullOnBothSides(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes).row(1L).row(new Object[]{null}).row(new Object[]{null}).row(1L).row(2L);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty());
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row(1L).row(2L).row(new Object[]{null}).row(3L).build();
        OperatorFactory joinOperatorFactory = JoinTestUtils.innerJoinOperatorFactory(this.operatorFactories, lookupSourceFactory, probePages, false);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{1L, 1L}).row(new Object[]{1L, 1L}).row(new Object[]{2L, 2L}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testProbeOuterJoin(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT)).addSequencePage(10, 20, 30, 40);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty());
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.addSequencePage(15, 20, 1020, 2020).build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages, false);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{"20", 1020L, 2020L, "20", 30L, 40L}).row(new Object[]{"21", 1021L, 2021L, "21", 31L, 41L}).row(new Object[]{"22", 1022L, 2022L, "22", 32L, 42L}).row(new Object[]{"23", 1023L, 2023L, "23", 33L, 43L}).row(new Object[]{"24", 1024L, 2024L, "24", 34L, 44L}).row(new Object[]{"25", 1025L, 2025L, "25", 35L, 45L}).row(new Object[]{"26", 1026L, 2026L, "26", 36L, 46L}).row(new Object[]{"27", 1027L, 2027L, "27", 37L, 47L}).row(new Object[]{"28", 1028L, 2028L, "28", 38L, 48L}).row(new Object[]{"29", 1029L, 2029L, "29", 39L, 49L}).row(new Object[]{"30", 1030L, 2030L, null, null, null}).row(new Object[]{"31", 1031L, 2031L, null, null, null}).row(new Object[]{"32", 1032L, 2032L, null, null, null}).row(new Object[]{"33", 1033L, 2033L, null, null, null}).row(new Object[]{"34", 1034L, 2034L, null, null, null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testProbeOuterJoinWithFilterFunction(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        JoinTestUtils.TestInternalJoinFilterFunction filterFunction = new JoinTestUtils.TestInternalJoinFilterFunction((leftPosition, leftPage, rightPosition, rightPage) -> BigintType.BIGINT.getLong(rightPage.getBlock(1), rightPosition) >= 1025L);
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT)).addSequencePage(10, 20, 30, 40);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.of(filterFunction));
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.addSequencePage(15, 20, 1020, 2020).build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages, true);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{"20", 1020L, 2020L, null, null, null}).row(new Object[]{"21", 1021L, 2021L, null, null, null}).row(new Object[]{"22", 1022L, 2022L, null, null, null}).row(new Object[]{"23", 1023L, 2023L, null, null, null}).row(new Object[]{"24", 1024L, 2024L, null, null, null}).row(new Object[]{"25", 1025L, 2025L, "25", 35L, 45L}).row(new Object[]{"26", 1026L, 2026L, "26", 36L, 46L}).row(new Object[]{"27", 1027L, 2027L, "27", 37L, 47L}).row(new Object[]{"28", 1028L, 2028L, "28", 38L, 48L}).row(new Object[]{"29", 1029L, 2029L, "29", 39L, 49L}).row(new Object[]{"30", 1030L, 2030L, null, null, null}).row(new Object[]{"31", 1031L, 2031L, null, null, null}).row(new Object[]{"32", 1032L, 2032L, null, null, null}).row(new Object[]{"33", 1033L, 2033L, null, null, null}).row(new Object[]{"34", 1034L, 2034L, null, null, null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testOuterJoinWithNullProbe(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes).row(1L).row(2L).row(3L);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty(), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row(1L).row(new Object[]{null}).row(new Object[]{null}).row(1L).row(2L).build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages, false);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{1L, 1L}).row(new Object[]{null, null}).row(new Object[]{null, null}).row(new Object[]{1L, 1L}).row(new Object[]{2L, 2L}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testOuterJoinWithNullProbeAndFilterFunction(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        JoinTestUtils.TestInternalJoinFilterFunction filterFunction = new JoinTestUtils.TestInternalJoinFilterFunction((leftPosition, leftPage, rightPosition, rightPage) -> BigintType.BIGINT.getLong(rightPage.getBlock(0), rightPosition) == 1L);
        ImmutableList buildTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes).row(1L).row(2L).row(3L);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.of(filterFunction), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row(1L).row(new Object[]{null}).row(new Object[]{null}).row(1L).row(2L).build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages, true);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{1L, 1L}).row(new Object[]{null, null}).row(new Object[]{null, null}).row(new Object[]{1L, 1L}).row(new Object[]{2L, null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testOuterJoinWithNullBuild(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)BigintType.BIGINT)).row(1L).row(new Object[]{null}).row(new Object[]{null}).row(1L).row(2L);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty(), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row(1L).row(2L).row(3L).build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages, false);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{1L, 1L}).row(new Object[]{1L, 1L}).row(new Object[]{2L, 2L}).row(new Object[]{3L, null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testOuterJoinWithNullBuildAndFilterFunction(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        JoinTestUtils.TestInternalJoinFilterFunction filterFunction = new JoinTestUtils.TestInternalJoinFilterFunction((leftPosition, leftPage, rightPosition, rightPage) -> ImmutableSet.of((Object)1L, (Object)3L).contains((Object)BigintType.BIGINT.getLong(rightPage.getBlock(0), rightPosition)));
        ImmutableList buildTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)BigintType.BIGINT)).row(1L).row(new Object[]{null}).row(new Object[]{null}).row(1L).row(2L);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.of(filterFunction), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row(1L).row(2L).row(3L).build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages, true);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{1L, 1L}).row(new Object[]{1L, 1L}).row(new Object[]{2L, null}).row(new Object[]{3L, null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testOuterJoinWithNullOnBothSides(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)BigintType.BIGINT)).row(1L).row(new Object[]{null}).row(new Object[]{null}).row(1L).row(2L);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty(), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row(1L).row(2L).row(new Object[]{null}).row(3L).build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages, false);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildPages.getTypesWithoutHash())).row(new Object[]{1L, 1L}).row(new Object[]{1L, 1L}).row(new Object[]{2L, 2L}).row(new Object[]{null, null}).row(new Object[]{3L, null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testOuterJoinWithNullOnBothSidesAndFilterFunction(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        JoinTestUtils.TestInternalJoinFilterFunction filterFunction = new JoinTestUtils.TestInternalJoinFilterFunction((leftPosition, leftPage, rightPosition, rightPage) -> ImmutableSet.of((Object)1L, (Object)3L).contains((Object)BigintType.BIGINT.getLong(rightPage.getBlock(0), rightPosition)));
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)BigintType.BIGINT)).row(1L).row(new Object[]{null}).row(new Object[]{null}).row(1L).row(2L);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.of(filterFunction), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row(1L).row(2L).row(new Object[]{null}).row(3L).build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages, true);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildPages.getTypesWithoutHash())).row(new Object[]{1L, 1L}).row(new Object[]{1L, 1L}).row(new Object[]{2L, null}).row(new Object[]{null, null}).row(new Object[]{3L, null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="testMemoryLimitProvider")
    public void testMemoryLimit(boolean parallelBuild, boolean buildHashEnabled) {
        TaskContext taskContext = TestingTaskContext.createTaskContext((Executor)this.executor, (ScheduledExecutorService)this.scheduledExecutor, (Session)SessionTestUtils.TEST_SESSION, (DataSize)DataSize.ofBytes((long)100L));
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT)).addSequencePage(10, 20, 30, 40);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty());
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> JoinTestUtils.buildLookupSource(this.executor, buildSideSetup)).isInstanceOf(ExceededMemoryLimitException.class)).hasMessageMatching("Query exceeded per-node memory limit of.*");
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testInnerJoinWithEmptyLookupSource(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty(), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        OperatorFactory joinOperatorFactory = this.operatorFactories.join(OperatorFactories.JoinOperatorType.innerJoin((boolean)false, (boolean)false), 0, new PlanNodeId("test"), lookupSourceFactoryManager, false, probePages.getTypes(), Ints.asList((int[])new int[]{0}), JoinTestUtils.getHashChannelAsInt(probePages), Optional.empty(), TYPE_OPERATOR_FACTORY);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        Operator operator = joinOperatorFactory.createOperator(taskContext.addPipelineContext(0, true, true, false).addDriverContext());
        List<Page> pages = probePages.row(1L).build();
        operator.addInput(pages.get(0));
        Page outputPage = operator.getOutput();
        Assert.assertNull((Object)outputPage);
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testLookupOuterJoinWithEmptyLookupSource(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty(), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        OperatorFactory joinOperatorFactory = this.operatorFactories.join(OperatorFactories.JoinOperatorType.lookupOuterJoin((boolean)false), 0, new PlanNodeId("test"), lookupSourceFactoryManager, false, probePages.getTypes(), Ints.asList((int[])new int[]{0}), JoinTestUtils.getHashChannelAsInt(probePages), Optional.empty(), TYPE_OPERATOR_FACTORY);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        Operator operator = joinOperatorFactory.createOperator(taskContext.addPipelineContext(0, true, true, false).addDriverContext());
        List<Page> pages = probePages.row(1L).build();
        operator.addInput(pages.get(0));
        Page outputPage = operator.getOutput();
        Assert.assertNull((Object)outputPage);
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testProbeOuterJoinWithEmptyLookupSource(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty(), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row(1L).row(2L).row(new Object[]{null}).row(3L).build();
        OperatorFactory joinOperatorFactory = this.operatorFactories.join(OperatorFactories.JoinOperatorType.probeOuterJoin((boolean)false), 0, new PlanNodeId("test"), lookupSourceFactoryManager, false, probePages.getTypes(), Ints.asList((int[])new int[]{0}), JoinTestUtils.getHashChannelAsInt(probePages), Optional.empty(), TYPE_OPERATOR_FACTORY);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{1L, null}).row(new Object[]{2L, null}).row(new Object[]{null, null}).row(new Object[]{3L, null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testFullOuterJoinWithEmptyLookupSource(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty(), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row(1L).row(2L).row(new Object[]{null}).row(3L).build();
        OperatorFactory joinOperatorFactory = this.operatorFactories.join(OperatorFactories.JoinOperatorType.fullOuterJoin(), 0, new PlanNodeId("test"), lookupSourceFactoryManager, false, probePages.getTypes(), Ints.asList((int[])new int[]{0}), JoinTestUtils.getHashChannelAsInt(probePages), Optional.empty(), TYPE_OPERATOR_FACTORY);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{1L, null}).row(new Object[]{2L, null}).row(new Object[]{null, null}).row(new Object[]{3L, null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public void testInnerJoinWithNonEmptyLookupSourceAndEmptyProbe(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean singleBigintLookupSource) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes).row(1L).row(2L).row(new Object[]{null}).row(3L);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty(), singleBigintLookupSource);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.build();
        OperatorFactory joinOperatorFactory = this.operatorFactories.join(OperatorFactories.JoinOperatorType.innerJoin((boolean)false, (boolean)false), 0, new PlanNodeId("test"), lookupSourceFactoryManager, false, probePages.getTypes(), Ints.asList((int[])new int[]{0}), JoinTestUtils.getHashChannelAsInt(probePages), Optional.empty(), TYPE_OPERATOR_FACTORY);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testInnerJoinWithBlockingLookupSourceAndEmptyProbe(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) throws Exception {
        TaskContext taskContext = this.createTaskContext();
        OperatorFactory joinOperatorFactory = this.createJoinOperatorFactoryWithBlockingLookupSource(taskContext, parallelBuild, probeHashEnabled, buildHashEnabled, true);
        DriverContext driverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();
        try (Operator joinOperator = joinOperatorFactory.createOperator(driverContext);){
            joinOperatorFactory.noMoreOperators();
            Assert.assertFalse((boolean)joinOperator.needsInput());
            joinOperator.finish();
            Assert.assertNull((Object)joinOperator.getOutput());
            Assert.assertFalse((boolean)joinOperator.isBlocked().isDone());
            Assert.assertFalse((boolean)joinOperator.isFinished());
        }
        taskContext = this.createTaskContext();
        joinOperatorFactory = this.createJoinOperatorFactoryWithBlockingLookupSource(taskContext, parallelBuild, probeHashEnabled, buildHashEnabled, false);
        driverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();
        joinOperator = joinOperatorFactory.createOperator(driverContext);
        try {
            joinOperatorFactory.noMoreOperators();
            Assert.assertTrue((boolean)joinOperator.needsInput());
            joinOperator.finish();
            Assert.assertNull((Object)joinOperator.getOutput());
            Assert.assertNull((Object)joinOperator.getOutput());
            Assert.assertTrue((boolean)joinOperator.isBlocked().isDone());
            Assert.assertTrue((boolean)joinOperator.isFinished());
        }
        finally {
            if (joinOperator != null) {
                joinOperator.close();
            }
        }
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testInnerJoinWithBlockingLookupSource(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) throws Exception {
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR));
        Page probePage = (Page)Iterables.getOnlyElement(probePages.addSequencePage(1, 0).build());
        TaskContext taskContext = this.createTaskContext();
        OperatorFactory joinOperatorFactory = this.createJoinOperatorFactoryWithBlockingLookupSource(taskContext, parallelBuild, probeHashEnabled, buildHashEnabled, true);
        DriverContext driverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();
        try (Operator joinOperator = joinOperatorFactory.createOperator(driverContext);){
            joinOperatorFactory.noMoreOperators();
            Assert.assertFalse((boolean)joinOperator.needsInput());
            Assert.assertNull((Object)joinOperator.getOutput());
            Assert.assertFalse((boolean)joinOperator.isBlocked().isDone());
            Assert.assertFalse((boolean)joinOperator.isFinished());
        }
        taskContext = this.createTaskContext();
        joinOperatorFactory = this.createJoinOperatorFactoryWithBlockingLookupSource(taskContext, parallelBuild, probeHashEnabled, buildHashEnabled, false);
        driverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();
        joinOperator = joinOperatorFactory.createOperator(driverContext);
        try {
            joinOperatorFactory.noMoreOperators();
            Assert.assertTrue((boolean)joinOperator.needsInput());
            Assert.assertNull((Object)joinOperator.getOutput());
            Assert.assertTrue((boolean)joinOperator.isBlocked().isDone());
            Assert.assertFalse((boolean)joinOperator.isFinished());
            joinOperator.addInput(probePage);
            Assert.assertNull((Object)joinOperator.getOutput());
            Assert.assertFalse((boolean)joinOperator.isBlocked().isDone());
            Assert.assertFalse((boolean)joinOperator.isFinished());
        }
        finally {
            if (joinOperator != null) {
                joinOperator.close();
            }
        }
    }

    @Test
    public void testInnerJoinLoadsPagesInOrder() {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(false, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes);
        for (int i = 0; i < 100000; ++i) {
            buildPages.row("a");
        }
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, false, taskContext, buildPages, Optional.empty());
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)IntegerType.INTEGER, (Object)IntegerType.INTEGER);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(false, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        probePages.row("a", 1L, 2L);
        WorkProcessorOperatorFactory joinOperatorFactory = (WorkProcessorOperatorFactory)JoinTestUtils.innerJoinOperatorFactory(this.operatorFactories, lookupSourceFactory, probePages, false);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinTestUtils.buildLookupSource(this.executor, buildSideSetup);
        Page probePage = (Page)Iterables.getOnlyElement(probePages.build());
        AtomicInteger totalProbePages = new AtomicInteger();
        WorkProcessor inputPages = WorkProcessor.create(() -> {
            int probePageNumber = totalProbePages.incrementAndGet();
            if (probePageNumber == 5) {
                return WorkProcessor.ProcessState.finished();
            }
            return WorkProcessor.ProcessState.ofResult((Object)new Page(1, new Block[]{probePage.getBlock(0), new LazyBlock(1, () -> probePage.getBlock(1)), new LazyBlock(1, () -> {
                Assert.assertEquals((int)probePageNumber, (int)totalProbePages.get());
                return probePage.getBlock(2);
            })}));
        });
        DriverContext driverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();
        OperatorContext operatorContext = driverContext.addOperatorContext(joinOperatorFactory.getOperatorId(), joinOperatorFactory.getPlanNodeId(), joinOperatorFactory.getOperatorType());
        WorkProcessorOperator joinOperator = joinOperatorFactory.create(new ProcessorContext(taskContext.getSession(), taskContext.getTaskMemoryContext(), operatorContext), inputPages);
        WorkProcessor outputPages = joinOperator.getOutputPages();
        int totalOutputPages = 0;
        for (int i = 0; i < 1000000; ++i) {
            if (!outputPages.process()) {
                driverContext.getYieldSignal().resetYieldForTesting();
                continue;
            }
            if (outputPages.isFinished()) break;
            Page page = (Page)outputPages.getResult();
            ++totalOutputPages;
            Assert.assertFalse((boolean)page.getBlock(1).isLoaded());
            page.getBlock(2).getLoadedBlock();
            driverContext.getYieldSignal().forceYieldForTesting();
        }
        Assert.assertTrue((totalOutputPages > totalProbePages.get() ? 1 : 0) != 0);
        Assert.assertTrue((boolean)outputPages.isFinished());
    }

    private OperatorFactory createJoinOperatorFactoryWithBlockingLookupSource(TaskContext taskContext, boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled, boolean waitForBuild) {
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes);
        JoinTestUtils.BuildSideSetup buildSideSetup = JoinTestUtils.setupBuildSide(this.nodePartitioningManager, parallelBuild, taskContext, buildPages, Optional.empty());
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        OperatorFactory joinOperatorFactory = this.operatorFactories.join(OperatorFactories.JoinOperatorType.innerJoin((boolean)false, (boolean)waitForBuild), 0, new PlanNodeId("test"), lookupSourceFactoryManager, false, probePages.getTypes(), Ints.asList((int[])new int[]{0}), JoinTestUtils.getHashChannelAsInt(probePages), Optional.empty(), TYPE_OPERATOR_FACTORY);
        JoinTestUtils.instantiateBuildDrivers(buildSideSetup, taskContext);
        return joinOperatorFactory;
    }

    @DataProvider(name="hashJoinTestValues")
    public static Object[][] hashJoinTestValuesProvider() {
        return DataProviders.cartesianProduct((Object[][][])new Object[][][]{{{true}, {false}}, {{true}, {false}}, {{true}, {false}}});
    }

    @DataProvider
    public static Object[][] testMemoryLimitProvider() {
        return DataProviders.cartesianProduct((Object[][][])new Object[][][]{{{true}, {false}}, {{true}, {false}}});
    }

    @DataProvider(name="singleBigintLookupSourceProvider")
    public static Object[][] singleBigintLookupSourceProvider() {
        return new Object[][]{{true}, {false}};
    }

    @DataProvider(name="hashJoinTestValuesAndsingleBigintLookupSourceProvider")
    public static Object[][] hashJoinTestValuesAndsingleBigintLookupSourceProvider() {
        return DataProviders.cartesianProduct((Object[][][])new Object[][][]{{{true}, {false}}, {{true}, {false}}, {{true}, {false}}, {{true}, {false}}});
    }

    private TaskContext createTaskContext() {
        return TestingTaskContext.createTaskContext((Executor)this.executor, (ScheduledExecutorService)this.scheduledExecutor, (Session)SessionTestUtils.TEST_SESSION);
    }

    private static List<Integer> getHashChannels(RowPagesBuilder probe, RowPagesBuilder build) {
        ImmutableList.Builder hashChannels = ImmutableList.builder();
        if (probe.getHashChannel().isPresent()) {
            hashChannels.add((Object)probe.getHashChannel().get());
        }
        if (build.getHashChannel().isPresent()) {
            hashChannels.add((Object)(probe.getTypes().size() + build.getHashChannel().get()));
        }
        return hashChannels.build();
    }

    private OperatorFactory probeOuterJoinOperatorFactory(JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager, RowPagesBuilder probePages, boolean hasFilter) {
        return this.operatorFactories.join(OperatorFactories.JoinOperatorType.probeOuterJoin((boolean)false), 0, new PlanNodeId("test"), lookupSourceFactoryManager, hasFilter, probePages.getTypes(), Ints.asList((int[])new int[]{0}), JoinTestUtils.getHashChannelAsInt(probePages), Optional.empty(), TYPE_OPERATOR_FACTORY);
    }

    private static <T> List<T> concat(List<T> initialElements, List<T> moreElements) {
        return ImmutableList.copyOf((Iterable)Iterables.concat(initialElements, moreElements));
    }
}

