/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.mongodb;

import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.mongodb.WriteMode;
import com.hazelcast.jet.mongodb.impl.Mappers;
import com.hazelcast.jet.mongodb.impl.MongoUtilities;
import com.hazelcast.jet.sql.impl.connector.HazelcastRexNode;
import com.hazelcast.jet.sql.impl.connector.SqlConnector;
import com.hazelcast.jet.sql.impl.connector.mongodb.DeleteProcessorSupplier;
import com.hazelcast.jet.sql.impl.connector.mongodb.InsertProcessorSupplier;
import com.hazelcast.jet.sql.impl.connector.mongodb.MongoSqlConnectorBase;
import com.hazelcast.jet.sql.impl.connector.mongodb.MongoTable;
import com.hazelcast.jet.sql.impl.connector.mongodb.RexToMongoVisitor;
import com.hazelcast.jet.sql.impl.connector.mongodb.UpdateProcessorSupplier;
import com.hazelcast.shaded.org.apache.calcite.rex.RexNode;
import com.hazelcast.sql.impl.schema.Table;
import com.mongodb.client.model.Filters;
import java.io.Serializable;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.bson.Document;
import org.bson.conversions.Bson;

public class MongoSqlConnector
extends MongoSqlConnectorBase {
    public static final String TYPE_NAME = "Mongo";

    @Override
    public String typeName() {
        return TYPE_NAME;
    }

    @Override
    @Nonnull
    public SqlConnector.VertexWithInputConfig insertProcessor(@Nonnull SqlConnector.DagBuildContext context) {
        Vertex vertex = context.getDag().newUniqueVertex("Insert(" + ((Table)context.getTable()).getSqlName() + ")", (ProcessorSupplier)new InsertProcessorSupplier((MongoTable)context.getTable(), WriteMode.INSERT_ONLY));
        return new SqlConnector.VertexWithInputConfig(vertex);
    }

    @Override
    @Nonnull
    public Vertex updateProcessor(@Nonnull SqlConnector.DagBuildContext context, @Nonnull List<String> fieldNames, @Nonnull List<HazelcastRexNode> expressions, @Nullable HazelcastRexNode predicate, boolean hasInput) {
        MongoTable table = (MongoTable)context.getTable();
        RexToMongoVisitor visitor = new RexToMongoVisitor();
        List updates = expressions.stream().map(e -> e.unwrap(RexNode.class).accept(visitor)).map(doc -> {
            assert (doc instanceof Serializable);
            return (Serializable)doc;
        }).collect(Collectors.toList());
        if (hasInput) {
            return context.getDag().newUniqueVertex("Update(" + table.getSqlName() + ")", (ProcessorSupplier)new UpdateProcessorSupplier(table, fieldNames, updates, null, hasInput));
        }
        Bson predicateRaw = predicate == null ? Filters.empty() : predicate.unwrap(RexNode.class).accept(visitor);
        Serializable translated = predicateRaw instanceof Bson ? Mappers.bsonToDocument((Bson)predicateRaw) : (Serializable)predicateRaw;
        return context.getDag().newUniqueVertex("Update(" + table.getSqlName() + ")", ProcessorMetaSupplier.forceTotalParallelismOne((ProcessorSupplier)new UpdateProcessorSupplier(table, fieldNames, updates, translated, hasInput)));
    }

    @Override
    @Nonnull
    public Vertex sinkProcessor(@Nonnull SqlConnector.DagBuildContext context) {
        MongoTable table = (MongoTable)context.getTable();
        return context.getDag().newUniqueVertex("Sink(" + table.getSqlName() + ")", (ProcessorSupplier)new InsertProcessorSupplier(table, WriteMode.UPSERT));
    }

    @Override
    @Nonnull
    public Vertex deleteProcessor(@Nonnull SqlConnector.DagBuildContext context, @Nullable HazelcastRexNode predicate, boolean hasInput) {
        Object predicateToSend;
        Document predicateTranslated;
        MongoTable table = (MongoTable)context.getTable();
        if (hasInput) {
            return context.getDag().newUniqueVertex("Delete(" + table.getSqlName() + ")", (ProcessorSupplier)new DeleteProcessorSupplier(table, null, hasInput));
        }
        Object object = predicateTranslated = predicate == null ? MongoUtilities.UPDATE_ALL_PREDICATE : predicate.unwrap(RexNode.class).accept(new RexToMongoVisitor());
        if (predicateTranslated instanceof Bson) {
            predicateToSend = Mappers.bsonToDocument((Bson)((Bson)predicateTranslated));
        } else {
            assert (predicateTranslated instanceof Serializable);
            predicateToSend = (Serializable)predicateTranslated;
        }
        return context.getDag().newUniqueVertex("Delete(" + table.getSqlName() + ")", ProcessorMetaSupplier.forceTotalParallelismOne((ProcessorSupplier)new DeleteProcessorSupplier(table, (Serializable)predicateToSend, hasInput)));
    }
}

