/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.opt.physical;

import com.hazelcast.cluster.Address;
import com.hazelcast.function.ComparatorEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.Functions;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.pipeline.ServiceFactories;
import com.hazelcast.jet.sql.impl.ExpressionUtil;
import com.hazelcast.jet.sql.impl.SimpleExpressionEvalContext;
import com.hazelcast.jet.sql.impl.aggregate.ObjectArrayKey;
import com.hazelcast.jet.sql.impl.connector.SqlConnector;
import com.hazelcast.jet.sql.impl.connector.SqlConnectorUtil;
import com.hazelcast.jet.sql.impl.opt.ExpressionValues;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateAccumulateByKeyPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateAccumulatePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateByKeyPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateCombineByKeyPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregateCombinePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.AggregatePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.DeletePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.FilterPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.FullScanPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.InsertPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.JetRootRel;
import com.hazelcast.jet.sql.impl.opt.physical.JoinNestedLoopPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.PhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.ProjectPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.SinkPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.SortPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.UpdatePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.ValuesPhysicalRel;
import com.hazelcast.jet.sql.impl.processors.RootResultConsumerSink;
import com.hazelcast.org.apache.calcite.rel.RelNode;
import com.hazelcast.org.apache.calcite.rel.SingleRel;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.sql.impl.QueryParameterMetadata;
import com.hazelcast.sql.impl.calcite.schema.HazelcastTable;
import com.hazelcast.sql.impl.expression.ConstantExpression;
import com.hazelcast.sql.impl.expression.Expression;
import com.hazelcast.sql.impl.expression.ExpressionEvalContext;
import com.hazelcast.sql.impl.optimizer.PlanObjectKey;
import com.hazelcast.sql.impl.schema.Table;
import com.hazelcast.sql.impl.type.QueryDataType;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nullable;

public class CreateDagVisitor {
    private final DAG dag = new DAG();
    private final Set<PlanObjectKey> objectKeys = new HashSet<PlanObjectKey>();
    private final NodeEngine nodeEngine;
    private final Address localMemberAddress;
    private final QueryParameterMetadata parameterMetadata;

    public CreateDagVisitor(NodeEngine nodeEngine, QueryParameterMetadata parameterMetadata) {
        this.nodeEngine = nodeEngine;
        this.localMemberAddress = nodeEngine.getLocalMember().getAddress();
        this.parameterMetadata = parameterMetadata;
    }

    public Vertex onValues(ValuesPhysicalRel rel) {
        List<ExpressionValues> values = rel.values();
        return this.dag.newUniqueVertex("Values", SourceProcessors.convenientSourceP(SimpleExpressionEvalContext::from, (context, buffer) -> {
            values.forEach(vs -> vs.toValues((ExpressionEvalContext)context).forEach(buffer::add));
            buffer.close();
        }, ctx -> null, (ctx, states) -> {}, ConsumerEx.noop(), 0, true, null));
    }

    public Vertex onInsert(InsertPhysicalRel rel) {
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        this.collectObjectKeys((Table)table);
        SqlConnector.VertexWithInputConfig vertexWithConfig = SqlConnectorUtil.getJetSqlConnector(table).insertProcessor(this.dag, (Table)table);
        Vertex vertex = vertexWithConfig.vertex();
        this.connectInput(rel.getInput(), vertex, vertexWithConfig.configureEdgeFn());
        return vertex;
    }

    public Vertex onSink(SinkPhysicalRel rel) {
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        this.collectObjectKeys((Table)table);
        Vertex vertex = SqlConnectorUtil.getJetSqlConnector(table).sinkProcessor(this.dag, (Table)table);
        this.connectInput(rel.getInput(), vertex, null);
        return vertex;
    }

    public Vertex onUpdate(UpdatePhysicalRel rel) {
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        Vertex vertex = SqlConnectorUtil.getJetSqlConnector(table).updateProcessor(this.dag, (Table)table, rel.updates(this.parameterMetadata));
        this.connectInput(rel.getInput(), vertex, null);
        return vertex;
    }

    public Vertex onDelete(DeletePhysicalRel rel) {
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        Vertex vertex = SqlConnectorUtil.getJetSqlConnector(table).deleteProcessor(this.dag, (Table)table);
        this.connectInput(rel.getInput(), vertex, null);
        return vertex;
    }

    public Vertex onFullScan(FullScanPhysicalRel rel) {
        Object table = rel.getTable().unwrap(HazelcastTable.class).getTarget();
        this.collectObjectKeys((Table)table);
        return SqlConnectorUtil.getJetSqlConnector(table).fullScanReader(this.dag, (Table)table, rel.filter(this.parameterMetadata), rel.projection(this.parameterMetadata));
    }

    public Vertex onFilter(FilterPhysicalRel rel) {
        Expression<Boolean> filter = rel.filter(this.parameterMetadata);
        Vertex vertex = this.dag.newUniqueVertex("Filter", Processors.filterUsingServiceP(ServiceFactories.nonSharedService(ctx -> ExpressionUtil.filterFn(filter, SimpleExpressionEvalContext.from(ctx))), Predicate::test));
        this.connectInputPreserveCollation(rel, vertex);
        return vertex;
    }

    public Vertex onProject(ProjectPhysicalRel rel) {
        List<Expression<?>> projection = rel.projection(this.parameterMetadata);
        Vertex vertex = this.dag.newUniqueVertex("Project", Processors.mapUsingServiceP(ServiceFactories.nonSharedService(ctx -> ExpressionUtil.projectionFn(projection, SimpleExpressionEvalContext.from(ctx))), Function::apply));
        this.connectInputPreserveCollation(rel, vertex);
        return vertex;
    }

    public Vertex onSort(SortPhysicalRel rel) {
        ComparatorEx<Object[]> comparator = ExpressionUtil.comparisonFn(rel.getCollations());
        Vertex sortVertex = this.dag.newUniqueVertex("Sort", ProcessorMetaSupplier.of(Processors.sortP(comparator)));
        this.connectInput(rel.getInput(), sortVertex, null);
        Vertex combineVertex = this.dag.newUniqueVertex("SortCombine", ProcessorMetaSupplier.forceTotalParallelismOne(ProcessorSupplier.of(Processors.mapP(FunctionEx.identity())), this.localMemberAddress));
        Edge edge = Edge.between(sortVertex, combineVertex).ordered(comparator).distributeTo(this.localMemberAddress).allToOne("");
        this.dag.edge(edge);
        return combineVertex;
    }

    public Vertex onAggregate(AggregatePhysicalRel rel) {
        AggregateOperation<?, Object[]> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("Aggregate", ProcessorMetaSupplier.forceTotalParallelismOne(ProcessorSupplier.of(Processors.aggregateP(aggregateOperation)), this.localMemberAddress));
        this.connectInput(rel.getInput(), vertex, edge -> edge.distributeTo(this.localMemberAddress).allToOne(""));
        return vertex;
    }

    public Vertex onAccumulate(AggregateAccumulatePhysicalRel rel) {
        AggregateOperation<?, Object[]> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("Accumulate", Processors.accumulateP(aggregateOperation));
        this.connectInput(rel.getInput(), vertex, null);
        return vertex;
    }

    public Vertex onCombine(AggregateCombinePhysicalRel rel) {
        AggregateOperation<?, Object[]> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("Combine", ProcessorMetaSupplier.forceTotalParallelismOne(ProcessorSupplier.of(Processors.combineP(aggregateOperation)), this.localMemberAddress));
        this.connectInput(rel.getInput(), vertex, edge -> edge.distributeTo(this.localMemberAddress).allToOne(""));
        return vertex;
    }

    public Vertex onAggregateByKey(AggregateByKeyPhysicalRel rel) {
        FunctionEx<Object[], ObjectArrayKey> groupKeyFn = rel.groupKeyFn();
        AggregateOperation<?, Object[]> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("AggregateByKey", Processors.aggregateByKeyP(Collections.singletonList(groupKeyFn), aggregateOperation, (key, value) -> value));
        this.connectInput(rel.getInput(), vertex, edge -> edge.distributed().partitioned(groupKeyFn));
        return vertex;
    }

    public Vertex onAccumulateByKey(AggregateAccumulateByKeyPhysicalRel rel) {
        FunctionEx<Object[], ObjectArrayKey> groupKeyFn = rel.groupKeyFn();
        AggregateOperation<?, Object[]> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("AccumulateByKey", Processors.accumulateByKeyP(Collections.singletonList(groupKeyFn), aggregateOperation));
        this.connectInput(rel.getInput(), vertex, edge -> edge.partitioned(groupKeyFn));
        return vertex;
    }

    public Vertex onCombineByKey(AggregateCombineByKeyPhysicalRel rel) {
        AggregateOperation<?, Object[]> aggregateOperation = rel.aggrOp();
        Vertex vertex = this.dag.newUniqueVertex("CombineByKey", Processors.combineByKeyP(aggregateOperation, (key, value) -> value));
        this.connectInput(rel.getInput(), vertex, edge -> edge.distributed().partitioned(Functions.entryKey()));
        return vertex;
    }

    public Vertex onNestedLoopJoin(JoinNestedLoopPhysicalRel rel) {
        assert (rel.getRight() instanceof FullScanPhysicalRel) : rel.getRight().getClass();
        Object rightTable = rel.getRight().getTable().unwrap(HazelcastTable.class).getTarget();
        this.collectObjectKeys((Table)rightTable);
        SqlConnector.VertexWithInputConfig vertexWithConfig = SqlConnectorUtil.getJetSqlConnector(rightTable).nestedLoopReader(this.dag, (Table)rightTable, rel.rightFilter(this.parameterMetadata), rel.rightProjection(this.parameterMetadata), rel.joinInfo(this.parameterMetadata));
        Vertex vertex = vertexWithConfig.vertex();
        this.connectInput(rel.getLeft(), vertex, vertexWithConfig.configureEdgeFn());
        return vertex;
    }

    public Vertex onRoot(JetRootRel rootRel) {
        Expression<?> offset;
        Expression<?> fetch;
        RelNode input = rootRel.getInput();
        if (input instanceof SortPhysicalRel) {
            SortPhysicalRel sortRel = (SortPhysicalRel)input;
            fetch = sortRel.fetch == null ? ConstantExpression.create(Long.MAX_VALUE, QueryDataType.BIGINT) : sortRel.fetch(this.parameterMetadata);
            offset = sortRel.offset == null ? ConstantExpression.create(0L, QueryDataType.BIGINT) : sortRel.offset(this.parameterMetadata);
            if (sortRel.collation.getFieldCollations().isEmpty()) {
                input = sortRel.getInput();
            }
        } else {
            fetch = ConstantExpression.create(Long.MAX_VALUE, QueryDataType.BIGINT);
            offset = ConstantExpression.create(0L, QueryDataType.BIGINT);
        }
        Vertex vertex = this.dag.newUniqueVertex("ClientSink", RootResultConsumerSink.rootResultConsumerSink(rootRel.getInitiatorAddress(), fetch, offset));
        this.connectInput(input, vertex, edge -> edge.distributeTo(this.localMemberAddress).allToOne(""));
        return vertex;
    }

    public DAG getDag() {
        return this.dag;
    }

    public Set<PlanObjectKey> getObjectKeys() {
        return this.objectKeys;
    }

    private Vertex connectInput(RelNode inputRel, Vertex thisVertex, @Nullable Consumer<Edge> configureEdgeFn) {
        Vertex inputVertex = ((PhysicalRel)inputRel).accept(this);
        Edge edge = Edge.between(inputVertex, thisVertex);
        if (configureEdgeFn != null) {
            configureEdgeFn.accept(edge);
        }
        this.dag.edge(edge);
        return inputVertex;
    }

    private void connectInputPreserveCollation(SingleRel rel, Vertex vertex) {
        boolean preserveCollation = rel.getTraitSet().getCollation().getFieldCollations().size() > 0;
        Vertex inputVertex = this.connectInput(rel.getInput(), vertex, preserveCollation ? Edge::isolated : null);
        if (preserveCollation) {
            int cooperativeThreadCount = this.nodeEngine.getConfig().getJetConfig().getInstanceConfig().getCooperativeThreadCount();
            int explicitLP = inputVertex.determineLocalParallelism(cooperativeThreadCount);
            inputVertex.determineLocalParallelism(explicitLP);
            vertex.localParallelism(explicitLP);
        }
    }

    private void collectObjectKeys(Table table) {
        PlanObjectKey objectKey = table.getObjectKey();
        if (objectKey != null) {
            this.objectKeys.add(objectKey);
        }
    }
}

