/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.operator;

import com.facebook.airlift.concurrent.MoreFutures;
import com.facebook.airlift.concurrent.Threads;
import com.facebook.airlift.testing.Assertions;
import com.facebook.presto.ExceededMemoryLimitException;
import com.facebook.presto.RowPagesBuilder;
import com.facebook.presto.Session;
import com.facebook.presto.SessionTestUtils;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskStateMachine;
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.operator.Driver;
import com.facebook.presto.operator.DriverContext;
import com.facebook.presto.operator.HashBuilderOperator;
import com.facebook.presto.operator.InternalJoinFilterFunction;
import com.facebook.presto.operator.JoinBridgeManager;
import com.facebook.presto.operator.LookupJoinOperators;
import com.facebook.presto.operator.LookupSourceFactory;
import com.facebook.presto.operator.LookupSourceProvider;
import com.facebook.presto.operator.Operator;
import com.facebook.presto.operator.OperatorAssertion;
import com.facebook.presto.operator.OperatorFactory;
import com.facebook.presto.operator.PagesIndex;
import com.facebook.presto.operator.PartitionedLookupSourceFactory;
import com.facebook.presto.operator.PipelineContext;
import com.facebook.presto.operator.PipelineExecutionStrategy;
import com.facebook.presto.operator.SpillContext;
import com.facebook.presto.operator.StandardJoinFilterFunction;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.operator.ValuesOperator;
import com.facebook.presto.operator.exchange.LocalExchange;
import com.facebook.presto.operator.exchange.LocalExchangeSinkOperator;
import com.facebook.presto.operator.exchange.LocalExchangeSourceOperator;
import com.facebook.presto.operator.index.PageBuffer;
import com.facebook.presto.operator.index.PageBufferOperator;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spiller.GenericPartitioningSpillerFactory;
import com.facebook.presto.spiller.PartitioningSpillerFactory;
import com.facebook.presto.spiller.SingleStreamSpiller;
import com.facebook.presto.spiller.SingleStreamSpillerFactory;
import com.facebook.presto.sql.gen.JoinFilterFunctionCompiler;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.planner.SystemPartitioningHandle;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.TestingSession;
import com.facebook.presto.testing.TestingTaskContext;
import com.facebook.presto.testing.assertions.Assert;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestHashJoinOperator {
    private static final int PARTITION_COUNT = 4;
    private static final LookupJoinOperators LOOKUP_JOIN_OPERATORS = new LookupJoinOperators();
    private static final SingleStreamSpillerFactory SINGLE_STREAM_SPILLER_FACTORY = new DummySpillerFactory();
    private static final PartitioningSpillerFactory PARTITIONING_SPILLER_FACTORY = new GenericPartitioningSpillerFactory(SINGLE_STREAM_SPILLER_FACTORY);
    private static final String PAGE_BUFFER = "PageBuffer";
    private ExecutorService executor;
    private ScheduledExecutorService scheduledExecutor;
    private PartitioningProviderManager partitioningProviderManager;
    private Session session;

    @BeforeMethod
    public void setUp() {
        this.executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.daemonThreadsNamed((String)"test-executor-%s"), new ThreadPoolExecutor.DiscardPolicy());
        this.scheduledExecutor = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed((String)"test-scheduledExecutor-%s"));
        this.partitioningProviderManager = new PartitioningProviderManager();
        this.session = TestingSession.testSessionBuilder().build();
    }

    @AfterMethod(alwaysRun=true)
    public void tearDown() {
        this.executor.shutdownNow();
        this.scheduledExecutor.shutdownNow();
        this.partitioningProviderManager = null;
        this.session = null;
    }

    @DataProvider(name="hashJoinTestValues")
    public static Object[][] hashJoinTestValuesProvider() {
        return new Object[][]{{true, true, true}, {true, true, false}, {true, false, true}, {true, false, false}, {false, true, true}, {false, true, false}, {false, false, true}, {false, false, false}};
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testInnerJoin(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT)).addSequencePage(10, 20, 30, 40);
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT));
        List<Page> probeInput = probePages.addSequencePage(1000, 0, 1000, 2000).build();
        OperatorFactory joinOperatorFactory = this.innerJoinOperatorFactory(lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probePages.getTypesWithoutHash(), buildPages.getTypesWithoutHash())).row(new Object[]{"20", 1020L, 2020L, "20", 30L, 40L}).row(new Object[]{"21", 1021L, 2021L, "21", 31L, 41L}).row(new Object[]{"22", 1022L, 2022L, "22", 32L, 42L}).row(new Object[]{"23", 1023L, 2023L, "23", 33L, 43L}).row(new Object[]{"24", 1024L, 2024L, "24", 34L, 44L}).row(new Object[]{"25", 1025L, 2025L, "25", 35L, 45L}).row(new Object[]{"26", 1026L, 2026L, "26", 36L, 46L}).row(new Object[]{"27", 1027L, 2027L, "27", 37L, 47L}).row(new Object[]{"28", 1028L, 2028L, "28", 38L, 48L}).row(new Object[]{"29", 1029L, 2029L, "29", 39L, 49L}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test
    public void testYield() {
        TaskContext taskContext = this.createTaskContext();
        DriverContext driverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();
        AtomicInteger filterFunctionCalls = new AtomicInteger();
        TestInternalJoinFilterFunction filterFunction = new TestInternalJoinFilterFunction((leftPosition, leftPage, rightPosition, rightPage) -> {
            filterFunctionCalls.incrementAndGet();
            driverContext.getYieldSignal().forceYieldForTesting();
            return true;
        });
        int entries = 40;
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(true, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)BigintType.BIGINT)).addSequencePage(entries, 42);
        BuildSideSetup buildSideSetup = this.setupBuildSide(true, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.of(filterFunction), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(false, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)BigintType.BIGINT));
        List<Page> probeInput = probePages.addSequencePage(100, 0).build();
        OperatorFactory joinOperatorFactory = LOOKUP_JOIN_OPERATORS.innerJoin(0, new PlanNodeId("test"), lookupSourceFactory, probePages.getTypes(), Ints.asList((int[])new int[]{0}), TestHashJoinOperator.getHashChannelAsInt(probePages), Optional.empty(), OptionalInt.of(1), PARTITIONING_SPILLER_FACTORY, false);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        Operator operator = joinOperatorFactory.createOperator(driverContext);
        org.testng.Assert.assertTrue((boolean)operator.needsInput());
        operator.addInput(probeInput.get(0));
        operator.finish();
        for (int i = 0; i < entries; ++i) {
            driverContext.getYieldSignal().setWithDelay(5L * TimeUnit.SECONDS.toNanos(1L), driverContext.getYieldExecutor());
            filterFunctionCalls.set(0);
            org.testng.Assert.assertNull((Object)operator.getOutput());
            Assert.assertEquals((int)filterFunctionCalls.get(), (int)1, (String)"Expected join to stop processing (yield) after calling filter function once");
            driverContext.getYieldSignal().reset();
        }
        driverContext.getYieldSignal().setWithDelay(5L * TimeUnit.SECONDS.toNanos(1L), driverContext.getYieldExecutor());
        Page output = null;
        for (int i = 0; output == null && i < 5; ++i) {
            output = operator.getOutput();
        }
        org.testng.Assert.assertNotNull(output);
        driverContext.getYieldSignal().reset();
        Assert.assertEquals((int)output.getPositionCount(), (int)entries);
    }

    @DataProvider
    public Object[][] joinWithSpillValues() {
        ImmutableList dictionaryProcessingValues = ImmutableList.of((Object)ImmutableList.of((Object)true), (Object)ImmutableList.of((Object)false));
        return (Object[][])TestHashJoinOperator.product(TestHashJoinOperator.joinWithSpillParameters(true), dictionaryProcessingValues).stream().map(List::toArray).toArray(x$0 -> new Object[x$0][]);
    }

    @DataProvider
    public Object[][] joinWithFailingSpillValues() {
        ImmutableList dictionaryProcessingValues = ImmutableList.of((Object)ImmutableList.of((Object)true), (Object)ImmutableList.of((Object)false));
        List spillFailValues = Arrays.stream(WhenSpillFails.values()).map(ImmutableList::of).collect(Collectors.toList());
        return (Object[][])TestHashJoinOperator.product(TestHashJoinOperator.product(TestHashJoinOperator.joinWithSpillParameters(false), spillFailValues), dictionaryProcessingValues).stream().map(List::toArray).toArray(x$0 -> new Object[x$0][]);
    }

    private static List<List<Object>> joinWithSpillParameters(boolean allowNoSpill) {
        ArrayList<List<Object>> result = new ArrayList<List<Object>>();
        UnmodifiableIterator unmodifiableIterator = ImmutableList.of((Object)false, (Object)true).iterator();
        while (unmodifiableIterator.hasNext()) {
            boolean probeHashEnabled = (Boolean)unmodifiableIterator.next();
            for (WhenSpill whenSpill : WhenSpill.values()) {
                if (allowNoSpill || whenSpill != WhenSpill.NEVER) {
                    result.add((List<Object>)ImmutableList.of((Object)probeHashEnabled, Collections.nCopies(4, whenSpill)));
                }
                if (whenSpill == WhenSpill.NEVER) continue;
                result.add((List<Object>)ImmutableList.of((Object)probeHashEnabled, TestHashJoinOperator.concat(Collections.singletonList(whenSpill), Collections.nCopies(3, WhenSpill.NEVER))));
            }
            result.add((List<Object>)ImmutableList.of((Object)probeHashEnabled, TestHashJoinOperator.concat(Arrays.asList(WhenSpill.DURING_BUILD, WhenSpill.AFTER_BUILD), Collections.nCopies(2, WhenSpill.NEVER))));
            result.add((List<Object>)ImmutableList.of((Object)probeHashEnabled, TestHashJoinOperator.concat(Arrays.asList(WhenSpill.DURING_BUILD, WhenSpill.DURING_USAGE), Collections.nCopies(2, WhenSpill.NEVER))));
        }
        return result;
    }

    @Test(dataProvider="joinWithSpillValues")
    public void testInnerJoinWithSpill(boolean probeHashEnabled, List<WhenSpill> whenSpill, boolean isDictionaryProcessingJoinEnabled) throws Exception {
        this.innerJoinWithSpill(probeHashEnabled, whenSpill, SINGLE_STREAM_SPILLER_FACTORY, PARTITIONING_SPILLER_FACTORY);
    }

    @Test(dataProvider="joinWithFailingSpillValues")
    public void testInnerJoinWithFailingSpill(boolean probeHashEnabled, List<WhenSpill> whenSpill, WhenSpillFails whenSpillFails, boolean isDictionaryProcessingJoinEnabled) throws Throwable {
        String expectedMessage;
        DummySpillerFactory buildSpillerFactory = new DummySpillerFactory();
        DummySpillerFactory joinSpillerFactory = new DummySpillerFactory();
        GenericPartitioningSpillerFactory partitioningSpillerFactory = new GenericPartitioningSpillerFactory((SingleStreamSpillerFactory)joinSpillerFactory);
        switch (whenSpillFails) {
            case SPILL_BUILD: {
                buildSpillerFactory.failSpill();
                expectedMessage = "Spill failed";
                break;
            }
            case SPILL_JOIN: {
                joinSpillerFactory.failSpill();
                expectedMessage = "Spill failed";
                break;
            }
            case UNSPILL_BUILD: {
                buildSpillerFactory.failUnspill();
                expectedMessage = "Unspill failed";
                break;
            }
            case UNSPILL_JOIN: {
                joinSpillerFactory.failUnspill();
                expectedMessage = "Unspill failed";
                break;
            }
            default: {
                throw new IllegalArgumentException(String.format("Unsupported option: %s", new Object[]{whenSpillFails}));
            }
        }
        try {
            this.innerJoinWithSpill(probeHashEnabled, whenSpill, buildSpillerFactory, (PartitioningSpillerFactory)partitioningSpillerFactory);
            org.testng.Assert.fail((String)"Exception not thrown");
        }
        catch (RuntimeException exception) {
            Assert.assertEquals((String)exception.getMessage(), (String)expectedMessage);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void innerJoinWithSpill(boolean probeHashEnabled, List<WhenSpill> whenSpill, SingleStreamSpillerFactory buildSpillerFactory, PartitioningSpillerFactory joinSpillerFactory) throws Exception {
        TaskStateMachine taskStateMachine = new TaskStateMachine(new TaskId("query", 0, 0, 0, 0), (Executor)this.executor);
        TaskContext taskContext = TestingTaskContext.createTaskContext((Executor)this.executor, (ScheduledExecutorService)this.scheduledExecutor, (Session)SessionTestUtils.TEST_SESSION, (TaskStateMachine)taskStateMachine);
        DriverContext joinDriverContext = taskContext.addPipelineContext(2, true, true, false).addDriverContext();
        AtomicBoolean called = new AtomicBoolean(false);
        TestInternalJoinFilterFunction filterFunction = new TestInternalJoinFilterFunction((leftPosition, leftPage, rightPosition, rightPage) -> {
            called.set(true);
            joinDriverContext.getYieldSignal().forceYieldForTesting();
            return true;
        });
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder((Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT)).addSequencePage(4, 20, 200).addSequencePage(4, 20, 200).addSequencePage(4, 30, 300).addSequencePage(4, 40, 400);
        BuildSideSetup buildSideSetup = this.setupBuildSide(true, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.of(filterFunction), true, buildSpillerFactory);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT)).row("20", 123000L).row("20", 123000L).pageBreak().addSequencePage(20, 0, 123000).addSequencePage(10, 30, 123000);
        OperatorFactory joinOperatorFactory = this.innerJoinOperatorFactory(lookupSourceFactoryManager, probePages, joinSpillerFactory);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        List<Driver> buildDrivers = buildSideSetup.getBuildDrivers();
        int buildOperatorCount = buildDrivers.size();
        Preconditions.checkState((buildOperatorCount == whenSpill.size() ? 1 : 0) != 0);
        LookupSourceFactory lookupSourceFactory = (LookupSourceFactory)lookupSourceFactoryManager.getJoinBridge(Lifespan.taskWide());
        try (Operator joinOperator = joinOperatorFactory.createOperator(joinDriverContext);){
            int i;
            ListenableFuture lookupSourceProvider = lookupSourceFactory.createLookupSourceProvider();
            ArrayList<Boolean> revoked = new ArrayList<Boolean>(Collections.nCopies(buildOperatorCount, false));
            while (!lookupSourceProvider.isDone()) {
                for (i = 0; i < buildOperatorCount; ++i) {
                    TestHashJoinOperator.checkErrors(taskStateMachine);
                    buildDrivers.get(i).process();
                    HashBuilderOperator buildOperator = buildSideSetup.getBuildOperators().get(i);
                    if (whenSpill.get(i) != WhenSpill.DURING_BUILD || buildOperator.getOperatorContext().getReservedRevocableBytes() <= 0L) continue;
                    Preconditions.checkState((!lookupSourceProvider.isDone() ? 1 : 0) != 0, (Object)"Too late, LookupSource already done");
                    TestHashJoinOperator.revokeMemory(buildOperator);
                    revoked.set(i, true);
                }
            }
            ((LookupSourceProvider)MoreFutures.getFutureValue((Future)lookupSourceProvider)).close();
            Assert.assertEquals(revoked, (Collection)((Collection)whenSpill.stream().map(WhenSpill.DURING_BUILD::equals).collect(ImmutableList.toImmutableList())), (String)"Some operators not spilled before LookupSource built");
            for (i = 0; i < buildOperatorCount; ++i) {
                if (whenSpill.get(i) != WhenSpill.AFTER_BUILD) continue;
                TestHashJoinOperator.revokeMemory(buildSideSetup.getBuildOperators().get(i));
            }
            for (Driver buildDriver : buildDrivers) {
                TestHashJoinOperator.runDriverInThread(this.executor, buildDriver);
            }
            ValuesOperator.ValuesOperatorFactory valuesOperatorFactory = new ValuesOperator.ValuesOperatorFactory(17, new PlanNodeId("values"), probePages.build());
            PageBuffer pageBuffer = new PageBuffer(10);
            PageBufferOperator.PageBufferOperatorFactory pageBufferOperatorFactory = new PageBufferOperator.PageBufferOperatorFactory(18, new PlanNodeId(PAGE_BUFFER), pageBuffer, PAGE_BUFFER);
            Driver joinDriver = Driver.createDriver((DriverContext)joinDriverContext, (Operator)valuesOperatorFactory.createOperator(joinDriverContext), (Operator[])new Operator[]{joinOperator, pageBufferOperatorFactory.createOperator(joinDriverContext)});
            while (!called.get()) {
                TestHashJoinOperator.processRow(joinDriver, taskStateMachine);
            }
            for (int i2 = 0; i2 < buildOperatorCount; ++i2) {
                if (whenSpill.get(i2) != WhenSpill.DURING_USAGE) continue;
                TestHashJoinOperator.triggerMemoryRevokingAndWait(buildSideSetup.getBuildOperators().get(i2), taskStateMachine);
            }
            while (!joinDriver.isFinished()) {
                TestHashJoinOperator.checkErrors(taskStateMachine);
                TestHashJoinOperator.processRow(joinDriver, taskStateMachine);
            }
            TestHashJoinOperator.checkErrors(taskStateMachine);
            List<Page> actualPages = TestHashJoinOperator.getPages(pageBuffer);
            MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probePages.getTypesWithoutHash(), buildPages.getTypesWithoutHash())).row(new Object[]{"20", 123000L, "20", 200L}).row(new Object[]{"20", 123000L, "20", 200L}).row(new Object[]{"20", 123000L, "20", 200L}).row(new Object[]{"20", 123000L, "20", 200L}).row(new Object[]{"30", 123000L, "30", 300L}).row(new Object[]{"31", 123001L, "31", 301L}).row(new Object[]{"32", 123002L, "32", 302L}).row(new Object[]{"33", 123003L, "33", 303L}).build();
            Assertions.assertEqualsIgnoreOrder((Iterable)TestHashJoinOperator.getProperColumns(joinOperator, TestHashJoinOperator.concat(probePages.getTypes(), buildPages.getTypes()), probePages, actualPages).getMaterializedRows(), (Iterable)expected.getMaterializedRows());
        }
        finally {
            joinOperatorFactory.noMoreOperators();
        }
    }

    @Test(timeOut=60000L)
    public void testInnerJoinWithSpillWithEarlyTermination() {
        TaskStateMachine taskStateMachine = new TaskStateMachine(new TaskId("query", 0, 0, 0, 0), (Executor)this.executor);
        TaskContext taskContext = TestingTaskContext.createTaskContext((Executor)this.executor, (ScheduledExecutorService)this.scheduledExecutor, (Session)SessionTestUtils.TEST_SESSION, (TaskStateMachine)taskStateMachine);
        PipelineContext joinPipelineContext = taskContext.addPipelineContext(2, true, true, false);
        DriverContext joinDriverContext1 = joinPipelineContext.addDriverContext();
        DriverContext joinDriverContext2 = joinPipelineContext.addDriverContext();
        DriverContext joinDriverContext3 = joinPipelineContext.addDriverContext();
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder((Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT)).addSequencePage(4, 20, 200).addSequencePage(4, 20, 200).addSequencePage(4, 30, 300).addSequencePage(4, 40, 400);
        AtomicBoolean called = new AtomicBoolean(false);
        TestInternalJoinFilterFunction filterFunction = new TestInternalJoinFilterFunction((leftPosition, leftPage, rightPosition, rightPage) -> {
            called.set(true);
            return true;
        });
        BuildSideSetup buildSideSetup = this.setupBuildSide(true, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.of(filterFunction), true, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        RowPagesBuilder probe1Pages = RowPagesBuilder.rowPagesBuilder(true, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT)).row("no_match_1", 123000L).row("no_match_2", 123000L);
        RowPagesBuilder probe2Pages = RowPagesBuilder.rowPagesBuilder(true, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT)).row("20", 123000L).row("20", 123000L).pageBreak().addSequencePage(20, 0, 123000).addSequencePage(10, 30, 123000);
        OperatorFactory joinOperatorFactory = this.innerJoinOperatorFactory(lookupSourceFactoryManager, probe2Pages, PARTITIONING_SPILLER_FACTORY, OptionalInt.of(3));
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        List<Driver> buildDrivers = buildSideSetup.getBuildDrivers();
        int buildOperatorCount = buildDrivers.size();
        LookupSourceFactory lookupSourceFactory = (LookupSourceFactory)lookupSourceFactoryManager.getJoinBridge(Lifespan.taskWide());
        Operator lookupOperator1 = joinOperatorFactory.createOperator(joinDriverContext1);
        Operator lookupOperator2 = joinOperatorFactory.createOperator(joinDriverContext2);
        Operator lookupOperator3 = joinOperatorFactory.createOperator(joinDriverContext3);
        joinOperatorFactory.noMoreOperators();
        ListenableFuture lookupSourceProvider = lookupSourceFactory.createLookupSourceProvider();
        while (!lookupSourceProvider.isDone()) {
            for (Driver buildDriver : buildDrivers) {
                TestHashJoinOperator.checkErrors(taskStateMachine);
                buildDriver.process();
            }
        }
        ((LookupSourceProvider)MoreFutures.getFutureValue((Future)lookupSourceProvider)).close();
        for (int i = 0; i < buildOperatorCount; ++i) {
            TestHashJoinOperator.revokeMemory(buildSideSetup.getBuildOperators().get(i));
        }
        for (Driver buildDriver : buildDrivers) {
            TestHashJoinOperator.runDriverInThread(this.executor, buildDriver);
        }
        ValuesOperator.ValuesOperatorFactory valuesOperatorFactory1 = new ValuesOperator.ValuesOperatorFactory(17, new PlanNodeId("values1"), probe1Pages.build());
        ValuesOperator.ValuesOperatorFactory valuesOperatorFactory2 = new ValuesOperator.ValuesOperatorFactory(18, new PlanNodeId("values2"), probe2Pages.build());
        ValuesOperator.ValuesOperatorFactory valuesOperatorFactory3 = new ValuesOperator.ValuesOperatorFactory(18, new PlanNodeId("values3"), (List)ImmutableList.of());
        PageBuffer pageBuffer = new PageBuffer(10);
        PageBufferOperator.PageBufferOperatorFactory pageBufferOperatorFactory = new PageBufferOperator.PageBufferOperatorFactory(19, new PlanNodeId(PAGE_BUFFER), pageBuffer, PAGE_BUFFER);
        Driver joinDriver1 = Driver.createDriver((DriverContext)joinDriverContext1, (Operator)valuesOperatorFactory1.createOperator(joinDriverContext1), (Operator[])new Operator[]{lookupOperator1, pageBufferOperatorFactory.createOperator(joinDriverContext1)});
        Driver joinDriver2 = Driver.createDriver((DriverContext)joinDriverContext2, (Operator)valuesOperatorFactory2.createOperator(joinDriverContext2), (Operator[])new Operator[]{lookupOperator2, pageBufferOperatorFactory.createOperator(joinDriverContext2)});
        Driver joinDriver3 = Driver.createDriver((DriverContext)joinDriverContext3, (Operator)valuesOperatorFactory3.createOperator(joinDriverContext3), (Operator[])new Operator[]{lookupOperator3, pageBufferOperatorFactory.createOperator(joinDriverContext3)});
        joinDriver3.close();
        joinDriver3.process();
        while (!called.get()) {
            TestHashJoinOperator.checkErrors(taskStateMachine);
            TestHashJoinOperator.processRow(joinDriver1, taskStateMachine);
            TestHashJoinOperator.processRow(joinDriver2, taskStateMachine);
        }
        joinDriver1.close();
        joinDriver1.process();
        while (!joinDriver2.isFinished()) {
            TestHashJoinOperator.processRow(joinDriver2, taskStateMachine);
        }
        TestHashJoinOperator.checkErrors(taskStateMachine);
        List<Page> pages = TestHashJoinOperator.getPages(pageBuffer);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probe2Pages.getTypesWithoutHash(), buildPages.getTypesWithoutHash())).row(new Object[]{"20", 123000L, "20", 200L}).row(new Object[]{"20", 123000L, "20", 200L}).row(new Object[]{"20", 123000L, "20", 200L}).row(new Object[]{"20", 123000L, "20", 200L}).row(new Object[]{"30", 123000L, "30", 300L}).row(new Object[]{"31", 123001L, "31", 301L}).row(new Object[]{"32", 123002L, "32", 302L}).row(new Object[]{"33", 123003L, "33", 303L}).build();
        Assertions.assertEqualsIgnoreOrder((Iterable)TestHashJoinOperator.getProperColumns(lookupOperator1, TestHashJoinOperator.concat(probe2Pages.getTypes(), buildPages.getTypes()), probe2Pages, pages).getMaterializedRows(), (Iterable)expected.getMaterializedRows());
    }

    private static void processRow(Driver joinDriver, TaskStateMachine taskStateMachine) {
        joinDriver.getDriverContext().getYieldSignal().setWithDelay(TimeUnit.SECONDS.toNanos(1L), joinDriver.getDriverContext().getYieldExecutor());
        joinDriver.process();
        joinDriver.getDriverContext().getYieldSignal().reset();
        TestHashJoinOperator.checkErrors(taskStateMachine);
    }

    private static void checkErrors(TaskStateMachine taskStateMachine) {
        if (taskStateMachine.getFailureCauses().size() > 0) {
            Throwable exception = Objects.requireNonNull((Throwable)taskStateMachine.getFailureCauses().peek());
            throw new RuntimeException(exception.getMessage(), exception);
        }
    }

    private static void revokeMemory(HashBuilderOperator operator) {
        MoreFutures.getFutureValue((Future)operator.startMemoryRevoke());
        operator.finishMemoryRevoke();
        Preconditions.checkState((operator.getState() == HashBuilderOperator.State.SPILLING_INPUT || operator.getState() == HashBuilderOperator.State.INPUT_SPILLED ? 1 : 0) != 0);
    }

    private static void triggerMemoryRevokingAndWait(HashBuilderOperator operator, TaskStateMachine taskStateMachine) throws Exception {
        operator.getOperatorContext().requestMemoryRevoking();
        while (operator.getOperatorContext().isMemoryRevokingRequested()) {
            TestHashJoinOperator.checkErrors(taskStateMachine);
            Thread.sleep(10L);
        }
        TestHashJoinOperator.checkErrors(taskStateMachine);
        Preconditions.checkState((operator.getState() == HashBuilderOperator.State.SPILLING_INPUT || operator.getState() == HashBuilderOperator.State.INPUT_SPILLED ? 1 : 0) != 0);
    }

    private static List<Page> getPages(PageBuffer pageBuffer) {
        ArrayList<Page> result = new ArrayList<Page>();
        Page page = pageBuffer.poll();
        while (page != null) {
            result.add(page);
            page = pageBuffer.poll();
        }
        return result;
    }

    private static MaterializedResult getProperColumns(Operator joinOperator, List<Type> types, RowPagesBuilder probePages, List<Page> actualPages) {
        if (probePages.getHashChannel().isPresent()) {
            ImmutableList hashChannels = ImmutableList.of((Object)probePages.getHashChannel().get());
            actualPages = OperatorAssertion.dropChannel(actualPages, (List<Integer>)hashChannels);
            types = OperatorAssertion.without(types, (Collection<Integer>)hashChannels);
        }
        return OperatorAssertion.toMaterializedResult(joinOperator.getOperatorContext().getSession(), types, actualPages);
    }

    @Test(timeOut=40000L)
    public void testBuildGracefulSpill() throws Exception {
        TaskStateMachine taskStateMachine = new TaskStateMachine(new TaskId("query", 0, 0, 0, 0), (Executor)this.executor);
        TaskContext taskContext = TestingTaskContext.createTaskContext((Executor)this.executor, (ScheduledExecutorService)this.scheduledExecutor, (Session)SessionTestUtils.TEST_SESSION, (TaskStateMachine)taskStateMachine);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder((Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT)).addSequencePage(4, 20, 200);
        DummySpillerFactory buildSpillerFactory = new DummySpillerFactory();
        BuildSideSetup buildSideSetup = this.setupBuildSide(true, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), true, buildSpillerFactory);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        PartitionedLookupSourceFactory lookupSourceFactory = (PartitionedLookupSourceFactory)lookupSourceFactoryManager.getJoinBridge(Lifespan.taskWide());
        lookupSourceFactory.finishProbeOperator(OptionalInt.of(1));
        HashBuilderOperator hashBuilderOperator = buildSideSetup.getBuildOperators().get(0);
        hashBuilderOperator.startMemoryRevoke().get();
        hashBuilderOperator.finishMemoryRevoke();
        hashBuilderOperator.finish();
        hashBuilderOperator.isBlocked().get();
        lookupSourceFactory.destroy();
        org.testng.Assert.assertTrue((boolean)hashBuilderOperator.isFinished());
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testInnerJoinWithNullProbe(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes).row("a").row("b").row("c");
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row("a").row(new Object[]{null}).row(new Object[]{null}).row("a").row("b").build();
        OperatorFactory joinOperatorFactory = this.innerJoinOperatorFactory(lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildPages.getTypesWithoutHash())).row(new Object[]{"a", "a"}).row(new Object[]{"a", "a"}).row(new Object[]{"b", "b"}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testInnerJoinWithNullBuild(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes).row("a").row(new Object[]{null}).row(new Object[]{null}).row("a").row("b");
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row("a").row("b").row("c").build();
        OperatorFactory joinOperatorFactory = this.innerJoinOperatorFactory(lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{"a", "a"}).row(new Object[]{"a", "a"}).row(new Object[]{"b", "b"}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testInnerJoinWithNullOnBothSides(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes).row("a").row(new Object[]{null}).row(new Object[]{null}).row("a").row("b");
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row("a").row("b").row(new Object[]{null}).row("c").build();
        OperatorFactory joinOperatorFactory = this.innerJoinOperatorFactory(lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{"a", "a"}).row(new Object[]{"a", "a"}).row(new Object[]{"b", "b"}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testProbeOuterJoin(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT)).addSequencePage(10, 20, 30, 40);
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.addSequencePage(15, 20, 1020, 2020).build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{"20", 1020L, 2020L, "20", 30L, 40L}).row(new Object[]{"21", 1021L, 2021L, "21", 31L, 41L}).row(new Object[]{"22", 1022L, 2022L, "22", 32L, 42L}).row(new Object[]{"23", 1023L, 2023L, "23", 33L, 43L}).row(new Object[]{"24", 1024L, 2024L, "24", 34L, 44L}).row(new Object[]{"25", 1025L, 2025L, "25", 35L, 45L}).row(new Object[]{"26", 1026L, 2026L, "26", 36L, 46L}).row(new Object[]{"27", 1027L, 2027L, "27", 37L, 47L}).row(new Object[]{"28", 1028L, 2028L, "28", 38L, 48L}).row(new Object[]{"29", 1029L, 2029L, "29", 39L, 49L}).row(new Object[]{"30", 1030L, 2030L, null, null, null}).row(new Object[]{"31", 1031L, 2031L, null, null, null}).row(new Object[]{"32", 1032L, 2032L, null, null, null}).row(new Object[]{"33", 1033L, 2033L, null, null, null}).row(new Object[]{"34", 1034L, 2034L, null, null, null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testProbeOuterJoinWithFilterFunction(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        TestInternalJoinFilterFunction filterFunction = new TestInternalJoinFilterFunction((leftPosition, leftPage, rightPosition, rightPage) -> BigintType.BIGINT.getLong(rightPage.getBlock(1), rightPosition) >= 1025L);
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT)).addSequencePage(10, 20, 30, 40);
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.of(filterFunction), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.addSequencePage(15, 20, 1020, 2020).build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{"20", 1020L, 2020L, null, null, null}).row(new Object[]{"21", 1021L, 2021L, null, null, null}).row(new Object[]{"22", 1022L, 2022L, null, null, null}).row(new Object[]{"23", 1023L, 2023L, null, null, null}).row(new Object[]{"24", 1024L, 2024L, null, null, null}).row(new Object[]{"25", 1025L, 2025L, "25", 35L, 45L}).row(new Object[]{"26", 1026L, 2026L, "26", 36L, 46L}).row(new Object[]{"27", 1027L, 2027L, "27", 37L, 47L}).row(new Object[]{"28", 1028L, 2028L, "28", 38L, 48L}).row(new Object[]{"29", 1029L, 2029L, "29", 39L, 49L}).row(new Object[]{"30", 1030L, 2030L, null, null, null}).row(new Object[]{"31", 1031L, 2031L, null, null, null}).row(new Object[]{"32", 1032L, 2032L, null, null, null}).row(new Object[]{"33", 1033L, 2033L, null, null, null}).row(new Object[]{"34", 1034L, 2034L, null, null, null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testOuterJoinWithNullProbe(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes).row("a").row("b").row("c");
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row("a").row(new Object[]{null}).row(new Object[]{null}).row("a").row("b").build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{"a", "a"}).row(new Object[]{null, null}).row(new Object[]{null, null}).row(new Object[]{"a", "a"}).row(new Object[]{"b", "b"}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testOuterJoinWithNullProbeAndFilterFunction(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        TestInternalJoinFilterFunction filterFunction = new TestInternalJoinFilterFunction((leftPosition, leftPage, rightPosition, rightPage) -> VarcharType.VARCHAR.getSlice(rightPage.getBlock(0), rightPosition).toStringAscii().equals("a"));
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes).row("a").row("b").row("c");
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.of(filterFunction), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row("a").row(new Object[]{null}).row(new Object[]{null}).row("a").row("b").build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{"a", "a"}).row(new Object[]{null, null}).row(new Object[]{null, null}).row(new Object[]{"a", "a"}).row(new Object[]{"b", null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testOuterJoinWithNullBuild(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR)).row("a").row(new Object[]{null}).row(new Object[]{null}).row("a").row("b");
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row("a").row("b").row("c").build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{"a", "a"}).row(new Object[]{"a", "a"}).row(new Object[]{"b", "b"}).row(new Object[]{"c", null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testOuterJoinWithNullBuildAndFilterFunction(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        TestInternalJoinFilterFunction filterFunction = new TestInternalJoinFilterFunction((leftPosition, leftPage, rightPosition, rightPage) -> ImmutableSet.of((Object)"a", (Object)"c").contains((Object)VarcharType.VARCHAR.getSlice(rightPage.getBlock(0), rightPosition).toStringAscii()));
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR)).row("a").row(new Object[]{null}).row(new Object[]{null}).row("a").row("b");
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.of(filterFunction), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row("a").row("b").row("c").build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{"a", "a"}).row(new Object[]{"a", "a"}).row(new Object[]{"b", null}).row(new Object[]{"c", null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testOuterJoinWithNullOnBothSides(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR)).row("a").row(new Object[]{null}).row(new Object[]{null}).row("a").row("b");
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row("a").row("b").row(new Object[]{null}).row("c").build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildPages.getTypesWithoutHash())).row(new Object[]{"a", "a"}).row(new Object[]{"a", "a"}).row(new Object[]{"b", "b"}).row(new Object[]{null, null}).row(new Object[]{"c", null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testOuterJoinWithNullOnBothSidesAndFilterFunction(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        TestInternalJoinFilterFunction filterFunction = new TestInternalJoinFilterFunction((leftPosition, leftPage, rightPosition, rightPage) -> ImmutableSet.of((Object)"a", (Object)"c").contains((Object)VarcharType.VARCHAR.getSlice(rightPage.getBlock(0), rightPosition).toStringAscii()));
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR)).row("a").row(new Object[]{null}).row(new Object[]{null}).row("a").row("b");
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.of(filterFunction), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row("a").row("b").row(new Object[]{null}).row("c").build();
        OperatorFactory joinOperatorFactory = this.probeOuterJoinOperatorFactory(lookupSourceFactory, probePages);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildPages.getTypesWithoutHash())).row(new Object[]{"a", "a"}).row(new Object[]{"a", "a"}).row(new Object[]{"b", null}).row(new Object[]{null, null}).row(new Object[]{"c", null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(expectedExceptions={ExceededMemoryLimitException.class}, expectedExceptionsMessageRegExp="Query exceeded per-node user memory limit of.*", dataProvider="testMemoryLimitProvider")
    public void testMemoryLimit(boolean parallelBuild, boolean buildHashEnabled) {
        TaskContext taskContext = TestingTaskContext.createTaskContext((Executor)this.executor, (ScheduledExecutorService)this.scheduledExecutor, (Session)SessionTestUtils.TEST_SESSION, (DataSize)new DataSize(100.0, DataSize.Unit.BYTE));
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT)).addSequencePage(10, 20, 30, 40);
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
    }

    @Test(expectedExceptions={ExceededMemoryLimitException.class}, expectedExceptionsMessageRegExp="Query exceeded per-node broadcast memory limit of.*", dataProvider="testMemoryLimitProvider")
    public void testBroadcastMemoryLimit(boolean parallelBuild, boolean buildHashEnabled) {
        TaskContext taskContext = TestingTaskContext.createTaskContext((Executor)this.executor, (ScheduledExecutorService)this.scheduledExecutor, (Session)SessionTestUtils.TEST_SESSION, (DataSize)new DataSize(100.0, DataSize.Unit.MEGABYTE));
        taskContext.getQueryContext().setMemoryLimits(new DataSize(512.0, DataSize.Unit.MEGABYTE), new DataSize(512.0, DataSize.Unit.MEGABYTE), new DataSize(100.0, DataSize.Unit.BYTE), new DataSize(512.0, DataSize.Unit.MEGABYTE));
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT)).addSequencePage(10, 20, 30, 40);
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY, true);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
    }

    @Test(expectedExceptions={ExceededMemoryLimitException.class}, expectedExceptionsMessageRegExp="Query exceeded per-node user memory limit of.* \\[Estimated Spilled:.*")
    public void testSpillMemoryLimit() {
        Session session = TestingSession.testSessionBuilder().setSystemProperty("query_max_memory_per_node", "1000B").build();
        TaskContext taskContext = TestingTaskContext.createTaskContext((Executor)this.executor, (ScheduledExecutorService)this.scheduledExecutor, (Session)session, (DataSize)SystemSessionProperties.getQueryMaxMemoryPerNode((Session)session));
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(true, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)ImmutableList.of((Object)VarcharType.VARCHAR, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT)).addSequencePage(1000, 2000, 3000, 4000);
        BuildSideSetup buildSideSetup = this.setupBuildSide(true, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), true, SINGLE_STREAM_SPILLER_FACTORY);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        PartitionedLookupSourceFactory lookupSourceFactory = (PartitionedLookupSourceFactory)lookupSourceFactoryManager.getJoinBridge(Lifespan.taskWide());
        ListenableFuture lookupSourceProvider = lookupSourceFactory.createLookupSourceProvider();
        List<Driver> buildDrivers = buildSideSetup.getBuildDrivers();
        while (!lookupSourceProvider.isDone()) {
            for (int i = 0; i < buildDrivers.size(); ++i) {
                TestHashJoinOperator.revokeMemory(buildSideSetup.getBuildOperators().get(i));
                buildDrivers.get(i).process();
            }
        }
        ((LookupSourceProvider)MoreFutures.getFutureValue((Future)lookupSourceProvider)).close();
        for (Driver buildDriver : buildDrivers) {
            TestHashJoinOperator.runDriverInThread(this.executor, buildDriver);
        }
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testInnerJoinWithEmptyLookupSource(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes);
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        OperatorFactory joinOperatorFactory = new LookupJoinOperators().innerJoin(0, new PlanNodeId("test"), lookupSourceFactoryManager, probePages.getTypes(), Ints.asList((int[])new int[]{0}), TestHashJoinOperator.getHashChannelAsInt(probePages), Optional.empty(), OptionalInt.of(1), PARTITIONING_SPILLER_FACTORY, false);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        Operator operator = joinOperatorFactory.createOperator(taskContext.addPipelineContext(0, true, true, false).addDriverContext());
        List<Page> pages = probePages.row("test").build();
        operator.addInput(pages.get(0));
        Page outputPage = operator.getOutput();
        org.testng.Assert.assertNull((Object)outputPage);
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testLookupOuterJoinWithEmptyLookupSource(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes);
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        OperatorFactory joinOperatorFactory = new LookupJoinOperators().lookupOuterJoin(0, new PlanNodeId("test"), lookupSourceFactoryManager, probePages.getTypes(), Ints.asList((int[])new int[]{0}), TestHashJoinOperator.getHashChannelAsInt(probePages), Optional.empty(), OptionalInt.of(1), PARTITIONING_SPILLER_FACTORY, false);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        Operator operator = joinOperatorFactory.createOperator(taskContext.addPipelineContext(0, true, true, false).addDriverContext());
        List<Page> pages = probePages.row("test").build();
        operator.addInput(pages.get(0));
        Page outputPage = operator.getOutput();
        org.testng.Assert.assertNull((Object)outputPage);
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testProbeOuterJoinWithEmptyLookupSource(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes);
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row("a").row("b").row(new Object[]{null}).row("c").build();
        OperatorFactory joinOperatorFactory = new LookupJoinOperators().probeOuterJoin(0, new PlanNodeId("test"), lookupSourceFactoryManager, probePages.getTypes(), Ints.asList((int[])new int[]{0}), TestHashJoinOperator.getHashChannelAsInt(probePages), Optional.empty(), OptionalInt.of(1), PARTITIONING_SPILLER_FACTORY, false);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{"a", null}).row(new Object[]{"b", null}).row(new Object[]{null, null}).row(new Object[]{"c", null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testFullOuterJoinWithEmptyLookupSource(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes);
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.row("a").row("b").row(new Object[]{null}).row("c").build();
        OperatorFactory joinOperatorFactory = new LookupJoinOperators().fullOuterJoin(0, new PlanNodeId("test"), lookupSourceFactoryManager, probePages.getTypes(), Ints.asList((int[])new int[]{0}), TestHashJoinOperator.getHashChannelAsInt(probePages), Optional.empty(), OptionalInt.of(1), PARTITIONING_SPILLER_FACTORY, false);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).row(new Object[]{"a", null}).row(new Object[]{"b", null}).row(new Object[]{null, null}).row(new Object[]{"c", null}).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @Test(dataProvider="hashJoinTestValues")
    public void testInnerJoinWithNonEmptyLookupSourceAndEmptyProbe(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) {
        TaskContext taskContext = this.createTaskContext();
        ImmutableList buildTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder buildPages = RowPagesBuilder.rowPagesBuilder(buildHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)buildTypes).row("a").row("b").row(new Object[]{null}).row("c");
        BuildSideSetup buildSideSetup = this.setupBuildSide(parallelBuild, taskContext, Ints.asList((int[])new int[]{0}), buildPages, Optional.empty(), false, SINGLE_STREAM_SPILLER_FACTORY);
        JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
        ImmutableList probeTypes = ImmutableList.of((Object)VarcharType.VARCHAR);
        RowPagesBuilder probePages = RowPagesBuilder.rowPagesBuilder(probeHashEnabled, (List<Integer>)Ints.asList((int[])new int[]{0}), (Iterable<Type>)probeTypes);
        List<Page> probeInput = probePages.build();
        OperatorFactory joinOperatorFactory = new LookupJoinOperators().innerJoin(0, new PlanNodeId("test"), lookupSourceFactoryManager, probePages.getTypes(), Ints.asList((int[])new int[]{0}), TestHashJoinOperator.getHashChannelAsInt(probePages), Optional.empty(), OptionalInt.of(1), PARTITIONING_SPILLER_FACTORY, false);
        this.instantiateBuildDrivers(buildSideSetup, taskContext);
        this.buildLookupSource(buildSideSetup);
        MaterializedResult expected = MaterializedResult.resultBuilder((Session)taskContext.getSession(), TestHashJoinOperator.concat(probeTypes, buildTypes)).build();
        OperatorAssertion.assertOperatorEquals(joinOperatorFactory, taskContext.addPipelineContext(0, true, true, false).addDriverContext(), probeInput, expected, true, TestHashJoinOperator.getHashChannels(probePages, buildPages));
    }

    @DataProvider
    public static Object[][] testMemoryLimitProvider() {
        return new Object[][]{{true, true}, {true, false}, {false, true}, {false, false}};
    }

    private TaskContext createTaskContext() {
        return TestingTaskContext.createTaskContext((Executor)this.executor, (ScheduledExecutorService)this.scheduledExecutor, (Session)SessionTestUtils.TEST_SESSION);
    }

    private static List<Integer> getHashChannels(RowPagesBuilder probe, RowPagesBuilder build) {
        ImmutableList.Builder hashChannels = ImmutableList.builder();
        if (probe.getHashChannel().isPresent()) {
            hashChannels.add((Object)probe.getHashChannel().get());
        }
        if (build.getHashChannel().isPresent()) {
            hashChannels.add((Object)(probe.getTypes().size() + build.getHashChannel().get()));
        }
        return hashChannels.build();
    }

    private OperatorFactory probeOuterJoinOperatorFactory(JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager, RowPagesBuilder probePages) {
        return LOOKUP_JOIN_OPERATORS.probeOuterJoin(0, new PlanNodeId("test"), lookupSourceFactoryManager, probePages.getTypes(), Ints.asList((int[])new int[]{0}), TestHashJoinOperator.getHashChannelAsInt(probePages), Optional.empty(), OptionalInt.of(1), PARTITIONING_SPILLER_FACTORY, false);
    }

    private OperatorFactory innerJoinOperatorFactory(JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager, RowPagesBuilder probePages, PartitioningSpillerFactory partitioningSpillerFactory) {
        return this.innerJoinOperatorFactory(lookupSourceFactoryManager, probePages, partitioningSpillerFactory, OptionalInt.of(1));
    }

    private OperatorFactory innerJoinOperatorFactory(JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager, RowPagesBuilder probePages, PartitioningSpillerFactory partitioningSpillerFactory, OptionalInt totalOperatorsCount) {
        return LOOKUP_JOIN_OPERATORS.innerJoin(0, new PlanNodeId("test"), lookupSourceFactoryManager, probePages.getTypes(), Ints.asList((int[])new int[]{0}), TestHashJoinOperator.getHashChannelAsInt(probePages), Optional.empty(), totalOperatorsCount, partitioningSpillerFactory, false);
    }

    private BuildSideSetup setupBuildSide(boolean parallelBuild, TaskContext taskContext, List<Integer> hashChannels, RowPagesBuilder buildPages, Optional<InternalJoinFilterFunction> filterFunction, boolean spillEnabled, SingleStreamSpillerFactory singleStreamSpillerFactory) {
        return this.setupBuildSide(parallelBuild, taskContext, hashChannels, buildPages, filterFunction, spillEnabled, singleStreamSpillerFactory, false);
    }

    private BuildSideSetup setupBuildSide(boolean parallelBuild, TaskContext taskContext, List<Integer> hashChannels, RowPagesBuilder buildPages, Optional<InternalJoinFilterFunction> filterFunction, boolean spillEnabled, SingleStreamSpillerFactory singleStreamSpillerFactory, boolean enforceBroadcastMemoryLimit) {
        Optional<JoinFilterFunctionCompiler.JoinFilterFunctionFactory> filterFunctionFactory = filterFunction.map(function -> (session, addresses, pages) -> new StandardJoinFilterFunction(function, addresses, pages));
        int partitionCount = parallelBuild ? 4 : 1;
        LocalExchange.LocalExchangeFactory localExchangeFactory = new LocalExchange.LocalExchangeFactory(this.partitioningProviderManager, this.session, SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, partitionCount, buildPages.getTypes(), hashChannels, buildPages.getHashChannel(), PipelineExecutionStrategy.UNGROUPED_EXECUTION, new DataSize(32.0, DataSize.Unit.MEGABYTE));
        LocalExchange.LocalExchangeSinkFactoryId localExchangeSinkFactoryId = localExchangeFactory.newSinkFactoryId();
        localExchangeFactory.noMoreSinkFactories();
        DriverContext collectDriverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();
        ValuesOperator.ValuesOperatorFactory valuesOperatorFactory = new ValuesOperator.ValuesOperatorFactory(0, new PlanNodeId("values"), buildPages.build());
        LocalExchangeSinkOperator.LocalExchangeSinkOperatorFactory sinkOperatorFactory = new LocalExchangeSinkOperator.LocalExchangeSinkOperatorFactory(localExchangeFactory, 1, new PlanNodeId("sink"), localExchangeSinkFactoryId, Function.identity());
        Driver sourceDriver = Driver.createDriver((DriverContext)collectDriverContext, (Operator)valuesOperatorFactory.createOperator(collectDriverContext), (Operator[])new Operator[]{sinkOperatorFactory.createOperator(collectDriverContext)});
        valuesOperatorFactory.noMoreOperators();
        sinkOperatorFactory.noMoreOperators();
        while (!sourceDriver.isFinished()) {
            sourceDriver.process();
        }
        LocalExchangeSourceOperator.LocalExchangeSourceOperatorFactory sourceOperatorFactory = new LocalExchangeSourceOperator.LocalExchangeSourceOperatorFactory(0, new PlanNodeId("source"), localExchangeFactory);
        JoinBridgeManager lookupSourceFactoryManager = JoinBridgeManager.lookupAllAtOnce((PartitionedLookupSourceFactory)new PartitionedLookupSourceFactory(buildPages.getTypes(), (List)TestHashJoinOperator.rangeList(buildPages.getTypes().size()).stream().map(buildPages.getTypes()::get).collect(ImmutableList.toImmutableList()), (List)hashChannels.stream().map(buildPages.getTypes()::get).collect(ImmutableList.toImmutableList()), partitionCount, (Map)Objects.requireNonNull(ImmutableMap.of(), "layout is null"), false));
        HashBuilderOperator.HashBuilderOperatorFactory buildOperatorFactory = new HashBuilderOperator.HashBuilderOperatorFactory(1, new PlanNodeId("build"), lookupSourceFactoryManager, TestHashJoinOperator.rangeList(buildPages.getTypes().size()), hashChannels, buildPages.getHashChannel().map(OptionalInt::of).orElse(OptionalInt.empty()), filterFunctionFactory, Optional.empty(), (List)ImmutableList.of(), 100, (PagesIndex.Factory)new PagesIndex.TestingFactory(false), spillEnabled, singleStreamSpillerFactory, enforceBroadcastMemoryLimit);
        return new BuildSideSetup((JoinBridgeManager<PartitionedLookupSourceFactory>)lookupSourceFactoryManager, buildOperatorFactory, sourceOperatorFactory, partitionCount);
    }

    private void instantiateBuildDrivers(BuildSideSetup buildSideSetup, TaskContext taskContext) {
        PipelineContext buildPipeline = taskContext.addPipelineContext(1, true, true, false);
        ArrayList<Driver> buildDrivers = new ArrayList<Driver>();
        ArrayList<HashBuilderOperator> buildOperators = new ArrayList<HashBuilderOperator>();
        for (int i = 0; i < buildSideSetup.getPartitionCount(); ++i) {
            DriverContext buildDriverContext = buildPipeline.addDriverContext();
            HashBuilderOperator buildOperator = buildSideSetup.getBuildOperatorFactory().createOperator(buildDriverContext);
            Driver driver = Driver.createDriver((DriverContext)buildDriverContext, (Operator)buildSideSetup.getBuildSideSourceOperatorFactory().createOperator(buildDriverContext), (Operator[])new Operator[]{buildOperator});
            buildDrivers.add(driver);
            buildOperators.add(buildOperator);
        }
        buildSideSetup.setDriversAndOperators(buildDrivers, buildOperators);
    }

    private void buildLookupSource(BuildSideSetup buildSideSetup) {
        Objects.requireNonNull(buildSideSetup, "buildSideSetup is null");
        LookupSourceFactory lookupSourceFactory = (LookupSourceFactory)buildSideSetup.getLookupSourceFactoryManager().getJoinBridge(Lifespan.taskWide());
        ListenableFuture lookupSourceProvider = lookupSourceFactory.createLookupSourceProvider();
        List<Driver> buildDrivers = buildSideSetup.getBuildDrivers();
        while (!lookupSourceProvider.isDone()) {
            for (Driver buildDriver : buildDrivers) {
                buildDriver.process();
            }
        }
        ((LookupSourceProvider)MoreFutures.getFutureValue((Future)lookupSourceProvider)).close();
        for (Driver buildDriver : buildDrivers) {
            TestHashJoinOperator.runDriverInThread(this.executor, buildDriver);
        }
    }

    private static void runDriverInThread(ExecutorService executor, Driver driver) {
        executor.execute(() -> {
            if (!driver.isFinished()) {
                try {
                    driver.process();
                }
                catch (PrestoException e) {
                    driver.getDriverContext().failed((Throwable)e);
                    throw e;
                }
                TestHashJoinOperator.runDriverInThread(executor, driver);
            }
        });
    }

    private static OptionalInt getHashChannelAsInt(RowPagesBuilder probePages) {
        return probePages.getHashChannel().map(OptionalInt::of).orElse(OptionalInt.empty());
    }

    private static List<Integer> rangeList(int endExclusive) {
        return (List)IntStream.range(0, endExclusive).boxed().collect(ImmutableList.toImmutableList());
    }

    private static <T> List<List<T>> product(List<List<T>> left, List<List<T>> right) {
        ArrayList<List<T>> result = new ArrayList<List<T>>();
        for (List<T> l : left) {
            for (List<T> r : right) {
                result.add(TestHashJoinOperator.concat(l, r));
            }
        }
        return result;
    }

    private static <T> List<T> concat(List<T> initialElements, List<T> moreElements) {
        return ImmutableList.copyOf((Iterable)Iterables.concat(initialElements, moreElements));
    }

    private static class BuildSideSetup {
        private final JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager;
        private final HashBuilderOperator.HashBuilderOperatorFactory buildOperatorFactory;
        private final LocalExchangeSourceOperator.LocalExchangeSourceOperatorFactory buildSideSourceOperatorFactory;
        private final int partitionCount;
        private List<Driver> buildDrivers;
        private List<HashBuilderOperator> buildOperators;

        BuildSideSetup(JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager, HashBuilderOperator.HashBuilderOperatorFactory buildOperatorFactory, LocalExchangeSourceOperator.LocalExchangeSourceOperatorFactory buildSideSourceOperatorFactory, int partitionCount) {
            this.lookupSourceFactoryManager = Objects.requireNonNull(lookupSourceFactoryManager, "lookupSourceFactoryManager is null");
            this.buildOperatorFactory = Objects.requireNonNull(buildOperatorFactory, "buildOperatorFactory is null");
            this.buildSideSourceOperatorFactory = buildSideSourceOperatorFactory;
            this.partitionCount = partitionCount;
        }

        void setDriversAndOperators(List<Driver> buildDrivers, List<HashBuilderOperator> buildOperators) {
            Preconditions.checkArgument((buildDrivers.size() == buildOperators.size() ? 1 : 0) != 0);
            this.buildDrivers = ImmutableList.copyOf(buildDrivers);
            this.buildOperators = ImmutableList.copyOf(buildOperators);
        }

        JoinBridgeManager<PartitionedLookupSourceFactory> getLookupSourceFactoryManager() {
            return this.lookupSourceFactoryManager;
        }

        HashBuilderOperator.HashBuilderOperatorFactory getBuildOperatorFactory() {
            return this.buildOperatorFactory;
        }

        public LocalExchangeSourceOperator.LocalExchangeSourceOperatorFactory getBuildSideSourceOperatorFactory() {
            return this.buildSideSourceOperatorFactory;
        }

        public int getPartitionCount() {
            return this.partitionCount;
        }

        List<Driver> getBuildDrivers() {
            Preconditions.checkState((this.buildDrivers != null ? 1 : 0) != 0, (Object)"buildDrivers is not initialized yet");
            return this.buildDrivers;
        }

        List<HashBuilderOperator> getBuildOperators() {
            Preconditions.checkState((this.buildOperators != null ? 1 : 0) != 0, (Object)"buildDrivers is not initialized yet");
            return this.buildOperators;
        }
    }

    private static class TestInternalJoinFilterFunction
    implements InternalJoinFilterFunction {
        private final Lambda lambda;

        private TestInternalJoinFilterFunction(Lambda lambda) {
            this.lambda = lambda;
        }

        public boolean filter(int leftPosition, Page leftPage, int rightPosition, Page rightPage) {
            return this.lambda.filter(leftPosition, leftPage, rightPosition, rightPage);
        }

        public static interface Lambda {
            public boolean filter(int var1, Page var2, int var3, Page var4);
        }
    }

    private static enum WhenSpillFails {
        SPILL_BUILD,
        SPILL_JOIN,
        UNSPILL_BUILD,
        UNSPILL_JOIN;

    }

    private static enum WhenSpill {
        DURING_BUILD,
        AFTER_BUILD,
        DURING_USAGE,
        NEVER;

    }

    private static class DummySpillerFactory
    implements SingleStreamSpillerFactory {
        private volatile boolean failSpill;
        private volatile boolean failUnspill;

        private DummySpillerFactory() {
        }

        void failSpill() {
            this.failSpill = true;
        }

        void failUnspill() {
            this.failUnspill = true;
        }

        public SingleStreamSpiller create(List<Type> types, SpillContext spillContext, LocalMemoryContext memoryContext) {
            return new SingleStreamSpiller(){
                private boolean writing = true;
                private final List<Page> spills = new ArrayList<Page>();

                public ListenableFuture<?> spill(Iterator<Page> pageIterator) {
                    Preconditions.checkState((boolean)this.writing, (Object)"writing already finished");
                    if (failSpill) {
                        return Futures.immediateFailedFuture((Throwable)new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Spill failed"));
                    }
                    Iterators.addAll(this.spills, pageIterator);
                    return Futures.immediateFuture(null);
                }

                public Iterator<Page> getSpilledPages() {
                    if (failUnspill) {
                        throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Unspill failed");
                    }
                    this.writing = false;
                    return Iterators.unmodifiableIterator(this.spills.iterator());
                }

                public long getSpilledPagesInMemorySize() {
                    return this.spills.stream().mapToLong(Page::getSizeInBytes).sum();
                }

                public ListenableFuture<List<Page>> getAllSpilledPages() {
                    if (failUnspill) {
                        return Futures.immediateFailedFuture((Throwable)new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Unspill failed"));
                    }
                    this.writing = false;
                    return Futures.immediateFuture((Object)ImmutableList.copyOf(this.spills));
                }

                public void commit() {
                    this.writing = false;
                }

                public void close() {
                    this.writing = false;
                }
            };
        }
    }
}

