/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.kafka;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.kafka.KafkaProcessors;
import com.hazelcast.jet.pipeline.DataConnectionRef;
import com.hazelcast.jet.sql.impl.connector.HazelcastRexNode;
import com.hazelcast.jet.sql.impl.connector.SqlConnector;
import com.hazelcast.jet.sql.impl.connector.kafka.KafkaTable;
import com.hazelcast.jet.sql.impl.connector.kafka.RowProjectorProcessorSupplier;
import com.hazelcast.jet.sql.impl.connector.keyvalue.KvMetadata;
import com.hazelcast.jet.sql.impl.connector.keyvalue.KvMetadataAvroResolver;
import com.hazelcast.jet.sql.impl.connector.keyvalue.KvMetadataJavaResolver;
import com.hazelcast.jet.sql.impl.connector.keyvalue.KvMetadataJsonResolver;
import com.hazelcast.jet.sql.impl.connector.keyvalue.KvMetadataNullResolver;
import com.hazelcast.jet.sql.impl.connector.keyvalue.KvMetadataResolver;
import com.hazelcast.jet.sql.impl.connector.keyvalue.KvMetadataResolvers;
import com.hazelcast.jet.sql.impl.connector.keyvalue.KvProcessors;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.sql.impl.QueryException;
import com.hazelcast.sql.impl.QueryUtils;
import com.hazelcast.sql.impl.expression.Expression;
import com.hazelcast.sql.impl.expression.ExpressionEvalContext;
import com.hazelcast.sql.impl.row.JetSqlRow;
import com.hazelcast.sql.impl.schema.ConstantTableStatistics;
import com.hazelcast.sql.impl.schema.MappingField;
import com.hazelcast.sql.impl.schema.Table;
import com.hazelcast.sql.impl.schema.TableField;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class KafkaSqlConnector
implements SqlConnector {
    public static final String TYPE_NAME = "Kafka";
    public static final String OPTION_BOOTSTRAP_SERVERS = "bootstrap.servers";
    public static final String OPTION_OFFSET_RESET = "auto.offset.reset";
    private static final KvMetadataResolvers METADATA_RESOLVERS = new KvMetadataResolvers(new KvMetadataResolver[]{KvMetadataNullResolver.INSTANCE, KvMetadataJavaResolver.INSTANCE, KvMetadataJsonResolver.INSTANCE, KvMetadataAvroResolver.INSTANCE}, new KvMetadataResolver[]{KvMetadataJavaResolver.INSTANCE, KvMetadataJsonResolver.INSTANCE, KvMetadataAvroResolver.INSTANCE});

    @Override
    public String typeName() {
        return TYPE_NAME;
    }

    @Override
    @Nonnull
    public String defaultObjectType() {
        return "Topic";
    }

    @Override
    @Nonnull
    public List<MappingField> resolveAndValidateFields(@Nonnull NodeEngine nodeEngine, @Nonnull SqlConnector.SqlExternalResource externalResource, @Nonnull List<MappingField> userFields) {
        if (externalResource.externalName().length > 1) {
            throw QueryException.error((String)("Invalid external name " + QueryUtils.quoteCompoundIdentifier(externalResource.externalName()) + ", external name for Kafka is allowed to have only a single component referencing the topic name"));
        }
        return METADATA_RESOLVERS.resolveAndValidateFields(userFields, externalResource.options(), nodeEngine);
    }

    @Override
    @Nonnull
    public Table createTable(@Nonnull NodeEngine nodeEngine, @Nonnull String schemaName, @Nonnull String mappingName, @Nonnull SqlConnector.SqlExternalResource externalResource, @Nonnull List<MappingField> resolvedFields) {
        KvMetadata keyMetadata = METADATA_RESOLVERS.resolveMetadata(true, resolvedFields, externalResource.options(), null);
        KvMetadata valueMetadata = METADATA_RESOLVERS.resolveMetadata(false, resolvedFields, externalResource.options(), null);
        List<TableField> fields = Stream.concat(keyMetadata.getFields().stream(), valueMetadata.getFields().stream()).collect(Collectors.toList());
        return new KafkaTable(this, schemaName, mappingName, fields, new ConstantTableStatistics(0L), externalResource.externalName()[0], externalResource.dataConnection(), externalResource.options(), keyMetadata.getQueryTargetDescriptor(), keyMetadata.getUpsertTargetDescriptor(), valueMetadata.getQueryTargetDescriptor(), valueMetadata.getUpsertTargetDescriptor(), externalResource.objectType());
    }

    @Override
    @Nonnull
    public Vertex fullScanReader(@Nonnull SqlConnector.DagBuildContext context, @Nullable HazelcastRexNode predicate, @Nonnull List<HazelcastRexNode> projection, @Nullable List<Map<String, Expression<?>>> partitionPruningCandidates, @Nullable FunctionEx<ExpressionEvalContext, EventTimePolicy<JetSqlRow>> eventTimePolicyProvider) {
        KafkaTable table = (KafkaTable)context.getTable();
        return context.getDag().newUniqueVertex(table.toString(), ProcessorMetaSupplier.of((int)table.preferredLocalParallelism(), (ProcessorSupplier)new RowProjectorProcessorSupplier(table.kafkaConsumerProperties(), table.dataConnectionName(), table.topicName(), eventTimePolicyProvider, table.paths(), table.types(), table.keyQueryDescriptor(), table.valueQueryDescriptor(), context.convertFilter(predicate), context.convertProjection(projection))));
    }

    @Override
    @Nonnull
    public SqlConnector.VertexWithInputConfig insertProcessor(@Nonnull SqlConnector.DagBuildContext context) {
        return new SqlConnector.VertexWithInputConfig(this.writeProcessor(context));
    }

    @Override
    @Nonnull
    public Vertex sinkProcessor(@Nonnull SqlConnector.DagBuildContext context) {
        return this.writeProcessor(context);
    }

    @Override
    public boolean supportsExpression(@Nonnull HazelcastRexNode expression) {
        return true;
    }

    @Nonnull
    private Vertex writeProcessor(@Nonnull SqlConnector.DagBuildContext context) {
        KafkaTable table = (KafkaTable)context.getTable();
        Vertex vStart = context.getDag().newUniqueVertex("Project(" + table + ")", KvProcessors.entryProjector(table.paths(), table.types(), table.keyUpsertDescriptor(), table.valueUpsertDescriptor(), false));
        vStart.localParallelism(1);
        Vertex vEnd = context.getDag().newUniqueVertex(table.toString(), table.dataConnectionName() == null ? KafkaProcessors.writeKafkaP((Properties)table.kafkaProducerProperties(), (String)table.topicName(), Map.Entry::getKey, Map.Entry::getValue, (boolean)true) : KafkaProcessors.writeKafkaP((DataConnectionRef)new DataConnectionRef(table.dataConnectionName()), (Properties)table.kafkaProducerProperties(), (String)table.topicName(), Map.Entry::getKey, Map.Entry::getValue, (boolean)true));
        context.getDag().edge(Edge.between((Vertex)vStart, (Vertex)vEnd));
        return vStart;
    }

    @Override
    public Set<String> nonSensitiveConnectorOptions() {
        Set<String> set = SqlConnector.super.nonSensitiveConnectorOptions();
        set.addAll(Set.of(OPTION_BOOTSTRAP_SERVERS, OPTION_OFFSET_RESET));
        return set;
    }
}

