/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.opt.physical;

import com.hazelcast.cluster.Address;
import com.hazelcast.function.BiConsumerEx;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.BiPredicateEx;
import com.hazelcast.function.ComparatorEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.Functions;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.function.ToLongFunctionEx;
import com.hazelcast.internal.serialization.impl.DefaultSerializationServiceBuilder;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.SlidingWindowPolicy;
import com.hazelcast.jet.core.TimestampKind;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.function.KeyedWindowResultFunction;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.pipeline.ServiceFactories;
import com.hazelcast.jet.pipeline.ServiceFactory;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.sql.impl.ExpressionUtil;
import com.hazelcast.jet.sql.impl.JetJoinInfo;
import com.hazelcast.jet.sql.impl.ObjectArrayKey;
import com.hazelcast.jet.sql.impl.aggregate.WindowUtils;
import com.hazelcast.jet.sql.impl.connector.SqlConnector;
import com.hazelcast.jet.sql.impl.connector.SqlConnectorUtil;
import com.hazelcast.jet.sql.impl.connector.map.IMapSqlConnector;
import com.hazelcast.jet.sql.impl.opt.ExpressionValues;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateAccumulateByKeyPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateAccumulatePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateByKeyPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateCombineByKeyPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateCombinePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregatePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.DeletePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.FilterPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.FullScanPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.IndexScanMapPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.InsertPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.JoinHashPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.JoinNestedLoopPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.PhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.ProjectPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.RootRel;
import com.hazelcast.jet.sql.impl.opt.physical.SinkPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.SlidingWindowAggregatePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.SlidingWindowPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.SortPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.UnionPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.UpdatePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.ValuesPhysicalRel;
import com.hazelcast.jet.sql.impl.processors.RootResultConsumerSink;
import com.hazelcast.jet.sql.impl.processors.SqlHashJoinP;
import com.hazelcast.jet.sql.impl.schema.HazelcastTable;
import com.hazelcast.org.apache.calcite.rel.RelNode;
import com.hazelcast.org.apache.calcite.rel.SingleRel;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.sql.impl.QueryParameterMetadata;
import com.hazelcast.sql.impl.expression.ConstantExpression;
import com.hazelcast.sql.impl.expression.Expression;
import com.hazelcast.sql.impl.expression.ExpressionEvalContext;
import com.hazelcast.sql.impl.optimizer.PlanObjectKey;
import com.hazelcast.sql.impl.row.JetSqlRow;
import com.hazelcast.sql.impl.schema.Table;
import com.hazelcast.sql.impl.type.QueryDataType;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;

public class CreateDagVisitor {
    private static final ExpressionEvalContext MOCK_EEC = new ExpressionEvalContext(Collections.emptyList(), new DefaultSerializationServiceBuilder().build());
    private static final int LOW_PRIORITY = 10;
    private static final int HIGH_PRIORITY = 1;
    private final DAG dag = new DAG();
    private final Set<PlanObjectKey> objectKeys = new HashSet<PlanObjectKey>();
    private final NodeEngine nodeEngine;
    private final Address localMemberAddress;
    private final QueryParameterMetadata parameterMetadata;

    public CreateDagVisitor(NodeEngine nodeEngine, QueryParameterMetadata parameterMetadata) {
        this.nodeEngine = nodeEngine;
        this.localMemberAddress = nodeEngine.getThisAddress();
        this.parameterMetadata = parameterMetadata;
    }

    public Vertex onValues(ValuesPhysicalRel rel) {
        List<ExpressionValues> values = rel.values();
        return this.dag.newUniqueVertex("Values", SourceProcessors.convenientSourceP(ExpressionEvalContext::from, (BiConsumerEx & Serializable)(context, buffer) -> {
            values.forEach(vs -> vs.toValues((ExpressionEvalContext)context).forEach(arg_0 -> ((SourceBuilder.SourceBuffer)buffer).add(arg_0)));
            buffer.close();
        }, (FunctionEx & Serializable)ctx -> null, (BiConsumerEx & Serializable)(ctx, states) -> {}, (ConsumerEx)ConsumerEx.noop(), (int)0, (boolean)true, null));
    }

    public Vertex onInsert(InsertPhysicalRel rel) {
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        this.collectObjectKeys((Table)table);
        SqlConnector.VertexWithInputConfig vertexWithConfig = SqlConnectorUtil.getJetSqlConnector(table).insertProcessor(this.dag, (Table)table);
        Vertex vertex = vertexWithConfig.vertex();
        this.connectInput(rel.getInput(), vertex, vertexWithConfig.configureEdgeFn());
        return vertex;
    }

    public Vertex onSink(SinkPhysicalRel rel) {
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        this.collectObjectKeys((Table)table);
        Vertex vertex = SqlConnectorUtil.getJetSqlConnector(table).sinkProcessor(this.dag, (Table)table);
        this.connectInput(rel.getInput(), vertex, null);
        return vertex;
    }

    public Vertex onUpdate(UpdatePhysicalRel rel) {
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        Vertex vertex = SqlConnectorUtil.getJetSqlConnector(table).updateProcessor(this.dag, (Table)table, rel.updates(this.parameterMetadata));
        this.connectInput(rel.getInput(), vertex, null);
        return vertex;
    }

    public Vertex onDelete(DeletePhysicalRel rel) {
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        Vertex vertex = SqlConnectorUtil.getJetSqlConnector(table).deleteProcessor(this.dag, (Table)table);
        this.connectInput(rel.getInput(), vertex, null);
        return vertex;
    }

    public Vertex onFullScan(FullScanPhysicalRel rel) {
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        this.collectObjectKeys((Table)table);
        return SqlConnectorUtil.getJetSqlConnector(table).fullScanReader(this.dag, (Table)table, rel.filter(this.parameterMetadata), rel.projection(this.parameterMetadata), rel.eventTimePolicyProvider());
    }

    public Vertex onMapIndexScan(IndexScanMapPhysicalRel rel) {
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        this.collectObjectKeys((Table)table);
        return ((IMapSqlConnector)SqlConnectorUtil.getJetSqlConnector(table)).indexScanReader(this.dag, this.localMemberAddress, (Table)table, rel.getIndex(), rel.filter(this.parameterMetadata), rel.projection(this.parameterMetadata), rel.getIndexFilter(), rel.getComparator(), rel.isDescending());
    }

    public Vertex onFilter(FilterPhysicalRel rel) {
        Expression<Boolean> filter = rel.filter(this.parameterMetadata);
        Vertex vertex = this.dag.newUniqueVertex("Filter", Processors.filterUsingServiceP((ServiceFactory)ServiceFactories.nonSharedService((FunctionEx & Serializable)ctx -> ExpressionUtil.filterFn(filter, ExpressionEvalContext.from((ProcessorSupplier.Context)ctx))), (BiPredicateEx & Serializable)(filterFn, row) -> filterFn.test(row)));
        this.connectInputPreserveCollation(rel, vertex);
        return vertex;
    }

    public Vertex onProject(ProjectPhysicalRel rel) {
        List<Expression<?>> projection = rel.projection(this.parameterMetadata);
        Vertex vertex = this.dag.newUniqueVertex("Project", Processors.mapUsingServiceP((ServiceFactory)ServiceFactories.nonSharedService((FunctionEx & Serializable)ctx -> ExpressionUtil.projectionFn(projection, ExpressionEvalContext.from((ProcessorSupplier.Context)ctx))), (BiFunctionEx & Serializable)(projectionFn, row) -> (JetSqlRow)projectionFn.apply(row)));
        this.connectInputPreserveCollation(rel, vertex);
        return vertex;
    }

    public Vertex onSort(SortPhysicalRel rel) {
        ComparatorEx<JetSqlRow> comparator = ExpressionUtil.comparisonFn(rel.getCollations());
        Vertex sortVertex = this.dag.newUniqueVertex("Sort", ProcessorMetaSupplier.of((SupplierEx)Processors.sortP(comparator)));
        this.connectInput(rel.getInput(), sortVertex, null);
        Vertex combineVertex = this.dag.newUniqueVertex("SortCombine", ProcessorMetaSupplier.forceTotalParallelismOne((ProcessorSupplier)ProcessorSupplier.of((SupplierEx)Processors.mapP((FunctionEx)FunctionEx.identity())), (Address)this.localMemberAddress));
        Edge edge = Edge.between((Vertex)sortVertex, (Vertex)combineVertex).ordered(comparator).distributeTo(this.localMemberAddress).allToOne((Object)"");
        this.dag.edge(edge);
        return combineVertex;
    }

    public Vertex onAggregate(AggregatePhysicalRel rel) {
        AggregateOperation<?, JetSqlRow> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("Aggregate", ProcessorMetaSupplier.forceTotalParallelismOne((ProcessorSupplier)ProcessorSupplier.of((SupplierEx)Processors.aggregateP(aggregateOperation)), (Address)this.localMemberAddress));
        this.connectInput(rel.getInput(), vertex, edge -> edge.distributeTo(this.localMemberAddress).allToOne((Object)""));
        return vertex;
    }

    public Vertex onAccumulate(AggregateAccumulatePhysicalRel rel) {
        AggregateOperation<?, JetSqlRow> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("Accumulate", Processors.accumulateP(aggregateOperation));
        this.connectInput(rel.getInput(), vertex, null);
        return vertex;
    }

    public Vertex onCombine(AggregateCombinePhysicalRel rel) {
        AggregateOperation<?, JetSqlRow> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("Combine", ProcessorMetaSupplier.forceTotalParallelismOne((ProcessorSupplier)ProcessorSupplier.of((SupplierEx)Processors.combineP(aggregateOperation)), (Address)this.localMemberAddress));
        this.connectInput(rel.getInput(), vertex, edge -> edge.distributeTo(this.localMemberAddress).allToOne((Object)""));
        return vertex;
    }

    public Vertex onAggregateByKey(AggregateByKeyPhysicalRel rel) {
        FunctionEx<JetSqlRow, ObjectArrayKey> groupKeyFn = rel.groupKeyFn();
        AggregateOperation<?, JetSqlRow> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("AggregateByKey", Processors.aggregateByKeyP(Collections.singletonList(groupKeyFn), aggregateOperation, (BiFunctionEx & Serializable)(key, value) -> value));
        this.connectInput(rel.getInput(), vertex, edge -> edge.distributed().partitioned(groupKeyFn));
        return vertex;
    }

    public Vertex onAccumulateByKey(AggregateAccumulateByKeyPhysicalRel rel) {
        FunctionEx<JetSqlRow, ObjectArrayKey> groupKeyFn = rel.groupKeyFn();
        AggregateOperation<?, JetSqlRow> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("AccumulateByKey", Processors.accumulateByKeyP(Collections.singletonList(groupKeyFn), aggregateOperation));
        this.connectInput(rel.getInput(), vertex, edge -> edge.partitioned(groupKeyFn));
        return vertex;
    }

    public Vertex onCombineByKey(AggregateCombineByKeyPhysicalRel rel) {
        AggregateOperation<?, JetSqlRow> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("CombineByKey", Processors.combineByKeyP(aggregateOperation, (BiFunctionEx & Serializable)(key, value) -> value));
        this.connectInput(rel.getInput(), vertex, edge -> edge.distributed().partitioned(Functions.entryKey()));
        return vertex;
    }

    public Vertex onSlidingWindow(SlidingWindowPhysicalRel rel) {
        int orderingFieldIndex = rel.orderingFieldIndex();
        FunctionEx<ExpressionEvalContext, SlidingWindowPolicy> windowPolicySupplier = rel.windowPolicyProvider();
        Vertex vertex = this.dag.newUniqueVertex("Sliding-Window", Processors.flatMapUsingServiceP((ServiceFactory)ServiceFactories.nonSharedService((FunctionEx & Serializable)ctx -> {
            ExpressionEvalContext evalContext = ExpressionEvalContext.from((ProcessorSupplier.Context)ctx);
            SlidingWindowPolicy windowPolicy = (SlidingWindowPolicy)windowPolicySupplier.apply((Object)evalContext);
            return row -> WindowUtils.addWindowBounds(row, orderingFieldIndex, windowPolicy);
        }), Function::apply));
        this.connectInput(rel.getInput(), vertex, null);
        return vertex;
    }

    public Vertex onSlidingWindowAggregate(SlidingWindowAggregatePhysicalRel rel) {
        FunctionEx<JetSqlRow, ?> groupKeyFn = rel.groupKeyFn();
        AggregateOperation<?, JetSqlRow> aggregateOperation = rel.aggrOp();
        Expression<?> timestampExpression = rel.timestampExpression();
        ToLongFunctionEx & Serializable timestampFn = (ToLongFunctionEx & Serializable)row -> WindowUtils.extractMillis(timestampExpression.eval(row.getRow(), MOCK_EEC));
        SlidingWindowPolicy windowPolicy = (SlidingWindowPolicy)rel.windowPolicyProvider().apply((Object)MOCK_EEC);
        KeyedWindowResultFunction<? super Object, ? super JetSqlRow, ?> resultMapping = rel.outputValueMapping();
        if (rel.numStages() == 1) {
            Vertex vertex = this.dag.newUniqueVertex("Sliding-Window-AggregateByKey", Processors.aggregateToSlidingWindowP(Collections.singletonList(groupKeyFn), Collections.singletonList(timestampFn), (TimestampKind)TimestampKind.EVENT, (SlidingWindowPolicy)windowPolicy, (long)0L, aggregateOperation, resultMapping));
            this.connectInput(rel.getInput(), vertex, edge -> edge.distributeTo(this.localMemberAddress).allToOne((Object)""));
            return vertex;
        }
        assert (rel.numStages() == 2);
        Vertex vertex1 = this.dag.newUniqueVertex("Sliding-Window-AccumulateByKey", Processors.accumulateByFrameP(Collections.singletonList(groupKeyFn), Collections.singletonList(timestampFn), (TimestampKind)TimestampKind.EVENT, (SlidingWindowPolicy)windowPolicy, aggregateOperation));
        Vertex vertex2 = this.dag.newUniqueVertex("Sliding-Window-CombineByKey", Processors.combineToSlidingWindowP((SlidingWindowPolicy)windowPolicy, aggregateOperation, resultMapping));
        this.connectInput(rel.getInput(), vertex1, edge -> edge.partitioned(groupKeyFn));
        this.dag.edge(Edge.between((Vertex)vertex1, (Vertex)vertex2).distributed().partitioned(Functions.entryKey()));
        return vertex2;
    }

    public Vertex onNestedLoopJoin(JoinNestedLoopPhysicalRel rel) {
        assert (rel.getRight() instanceof FullScanPhysicalRel) : rel.getRight().getClass();
        Object rightTable = rel.getRight().getTable().unwrap(HazelcastTable.class).getTarget();
        this.collectObjectKeys((Table)rightTable);
        SqlConnector.VertexWithInputConfig vertexWithConfig = SqlConnectorUtil.getJetSqlConnector(rightTable).nestedLoopReader(this.dag, (Table)rightTable, rel.rightFilter(this.parameterMetadata), rel.rightProjection(this.parameterMetadata), rel.joinInfo(this.parameterMetadata));
        Vertex vertex = vertexWithConfig.vertex();
        this.connectInput(rel.getLeft(), vertex, vertexWithConfig.configureEdgeFn());
        return vertex;
    }

    public Vertex onHashJoin(JoinHashPhysicalRel rel) {
        JetJoinInfo joinInfo = rel.joinInfo(this.parameterMetadata);
        Vertex joinVertex = this.dag.newUniqueVertex("Hash Join", (ProcessorSupplier)SqlHashJoinP.supplier(joinInfo, rel.getRight().getRowType().getFieldCount()));
        this.connectJoinInput(joinInfo, rel.getLeft(), rel.getRight(), joinVertex);
        return joinVertex;
    }

    public Vertex onUnion(UnionPhysicalRel rel) {
        if (!rel.all) {
            throw new RuntimeException("Union[all=false] rel should never be produced");
        }
        Vertex merger = this.dag.newUniqueVertex("UnionMerger", ProcessorSupplier.of((SupplierEx)Processors.mapP((FunctionEx)FunctionEx.identity())));
        int ordinal = 0;
        for (RelNode input : rel.getInputs()) {
            Vertex inputVertex = ((PhysicalRel)input).accept(this);
            Edge edge = Edge.from((Vertex)inputVertex).to(merger, ordinal++);
            this.dag.edge(edge);
        }
        return merger;
    }

    public Vertex onRoot(RootRel rootRel) {
        Object offset;
        Object fetch;
        RelNode input = rootRel.getInput();
        if (input instanceof SortPhysicalRel || this.isProjectionWithSort(input)) {
            SortPhysicalRel sortRel = input instanceof SortPhysicalRel ? (SortPhysicalRel)input : (SortPhysicalRel)((ProjectPhysicalRel)input).getInput();
            fetch = sortRel.fetch == null ? ConstantExpression.create((Object)Long.MAX_VALUE, (QueryDataType)QueryDataType.BIGINT) : sortRel.fetch(this.parameterMetadata);
            offset = sortRel.offset == null ? ConstantExpression.create((Object)0L, (QueryDataType)QueryDataType.BIGINT) : sortRel.offset(this.parameterMetadata);
            if (!sortRel.requiresSort()) {
                input = sortRel.getInput();
            }
        } else {
            fetch = ConstantExpression.create((Object)Long.MAX_VALUE, (QueryDataType)QueryDataType.BIGINT);
            offset = ConstantExpression.create((Object)0L, (QueryDataType)QueryDataType.BIGINT);
        }
        Vertex vertex = this.dag.newUniqueVertex("ClientSink", RootResultConsumerSink.rootResultConsumerSink(this.localMemberAddress, fetch, offset));
        this.connectInput(input, vertex, edge -> edge.distributeTo(this.localMemberAddress).allToOne((Object)""));
        return vertex;
    }

    public DAG getDag() {
        return this.dag;
    }

    public Set<PlanObjectKey> getObjectKeys() {
        return this.objectKeys;
    }

    private Vertex connectInput(RelNode inputRel, Vertex thisVertex, @Nullable Consumer<Edge> configureEdgeFn) {
        Vertex inputVertex = ((PhysicalRel)inputRel).accept(this);
        Edge edge = Edge.between((Vertex)inputVertex, (Vertex)thisVertex);
        if (configureEdgeFn != null) {
            configureEdgeFn.accept(edge);
        }
        this.dag.edge(edge);
        return inputVertex;
    }

    private void connectJoinInput(JetJoinInfo joinInfo, RelNode leftInputRel, RelNode rightInputRel, Vertex joinVertex) {
        Vertex leftInput = ((PhysicalRel)leftInputRel).accept(this);
        Vertex rightInput = ((PhysicalRel)rightInputRel).accept(this);
        Edge left = Edge.between((Vertex)leftInput, (Vertex)joinVertex).priority(10).broadcast().distributed();
        Edge right = Edge.from((Vertex)rightInput).to(joinVertex, 1).priority(1).unicast().local();
        if (joinInfo.isLeftOuter()) {
            left = left.unicast().local();
            right = right.broadcast().distributed();
        }
        if (joinInfo.isEquiJoin()) {
            left = left.distributed().partitioned(ObjectArrayKey.projectFn(joinInfo.leftEquiJoinIndices()));
            right = right.distributed().partitioned(ObjectArrayKey.projectFn(joinInfo.rightEquiJoinIndices()));
        }
        this.dag.edge(left);
        this.dag.edge(right);
    }

    private void connectInputPreserveCollation(SingleRel rel, Vertex vertex) {
        boolean preserveCollation = rel.getTraitSet().getCollation().getFieldCollations().size() > 0;
        Vertex inputVertex = this.connectInput(rel.getInput(), vertex, preserveCollation ? Edge::isolated : null);
        if (preserveCollation) {
            int cooperativeThreadCount = this.nodeEngine.getConfig().getJetConfig().getCooperativeThreadCount();
            int explicitLP = inputVertex.determineLocalParallelism(cooperativeThreadCount);
            inputVertex.determineLocalParallelism(explicitLP);
            vertex.localParallelism(explicitLP);
        }
    }

    private void collectObjectKeys(Table table) {
        PlanObjectKey objectKey = table.getObjectKey();
        if (objectKey != null) {
            this.objectKeys.add(objectKey);
        }
    }

    private boolean isProjectionWithSort(RelNode input) {
        return input instanceof ProjectPhysicalRel && ((ProjectPhysicalRel)input).getInput() instanceof SortPhysicalRel;
    }
}

