/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.datastore.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.EntityResult;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.KindExpression;
import com.google.datastore.v1.PartitionId;
import com.google.datastore.v1.Query;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.Value;
import com.google.datastore.v1.client.Datastore;
import com.google.datastore.v1.client.DatastoreException;
import com.google.datastore.v1.client.DatastoreHelper;
import com.google.protobuf.Int32Value;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.gcp.datastore.source.DatastoreInputFormatProvider;
import io.cdap.plugin.gcp.datastore.source.DatastoreSourceConfig;
import io.cdap.plugin.gcp.datastore.source.EntityToRecordTransformer;
import io.cdap.plugin.gcp.datastore.util.DatastoreUtil;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="batchsource")
@Name(value="Datastore")
@Description(value="Google Cloud Datastore is a NoSQL document database built for automatic scaling and high performance. Source plugin provides ability to read data from it by Kind with various filters usage.")
public class DatastoreSource
extends BatchSource<NullWritable, Entity, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(DatastoreSource.class);
    public static final String NAME = "Datastore";
    private static final Map<Value.ValueTypeCase, Schema> SUPPORTED_SIMPLE_TYPES = new ImmutableMap.Builder().put((Object)Value.ValueTypeCase.STRING_VALUE, (Object)Schema.of((Schema.Type)Schema.Type.STRING)).put((Object)Value.ValueTypeCase.INTEGER_VALUE, (Object)Schema.of((Schema.Type)Schema.Type.LONG)).put((Object)Value.ValueTypeCase.DOUBLE_VALUE, (Object)Schema.of((Schema.Type)Schema.Type.DOUBLE)).put((Object)Value.ValueTypeCase.BOOLEAN_VALUE, (Object)Schema.of((Schema.Type)Schema.Type.BOOLEAN)).put((Object)Value.ValueTypeCase.TIMESTAMP_VALUE, (Object)Schema.of((Schema.LogicalType)Schema.LogicalType.TIMESTAMP_MICROS)).put((Object)Value.ValueTypeCase.BLOB_VALUE, (Object)Schema.of((Schema.Type)Schema.Type.BYTES)).put((Object)Value.ValueTypeCase.NULL_VALUE, (Object)Schema.of((Schema.Type)Schema.Type.NULL)).build();
    private final DatastoreSourceConfig config;
    private EntityToRecordTransformer entityToRecordTransformer;

    public DatastoreSource(DatastoreSourceConfig config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        LOG.debug("Validate config during `configurePipeline` stage: {}", (Object)this.config);
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector collector = stageConfigurer.getFailureCollector();
        this.config.validate(collector);
        collector.getOrThrowException();
        Schema configuredSchema = this.config.getSchema(collector);
        if (!this.config.shouldConnect()) {
            stageConfigurer.setOutputSchema(configuredSchema);
            return;
        }
        if (configuredSchema == null) {
            configuredSchema = this.getSchema(collector);
            stageConfigurer.setOutputSchema(configuredSchema);
            return;
        }
        pipelineConfigurer.getStageConfigurer().setOutputSchema(configuredSchema);
    }

    public void prepareRun(BatchSourceContext batchSourceContext) {
        LOG.debug("Validate config during `prepareRun` stage: {}", (Object)this.config);
        FailureCollector collector = batchSourceContext.getFailureCollector();
        this.config.validate(collector);
        collector.getOrThrowException();
        String project = this.config.getProject();
        String serviceAccount = this.config.getServiceAccount();
        String namespace = this.config.getNamespace();
        String kind = this.config.getKind();
        String pbQuery = this.config.constructPbQuery(collector).toString();
        String splits = String.valueOf(this.config.getNumSplits());
        batchSourceContext.setInput(Input.of((String)this.config.getReferenceName(), (InputFormatProvider)new DatastoreInputFormatProvider(project, serviceAccount, this.config.isServiceAccountFilePath(), namespace, kind, pbQuery, splits)));
        Schema schema = batchSourceContext.getOutputSchema();
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)batchSourceContext, this.config.getReferenceName());
        lineageRecorder.createExternalDataset(schema);
        lineageRecorder.recordRead("Read", "Read from Cloud Datastore.", Objects.requireNonNull(schema.getFields()).stream().map(Schema.Field::getName).collect(Collectors.toList()));
    }

    public void initialize(BatchRuntimeContext context) throws Exception {
        super.initialize(context);
        this.entityToRecordTransformer = new EntityToRecordTransformer(context.getOutputSchema(), this.config.getKeyType(context.getFailureCollector()), this.config.getKeyAlias());
    }

    public void transform(KeyValue<NullWritable, Entity> input, Emitter<StructuredRecord> emitter) {
        Entity entity = (Entity)input.getValue();
        StructuredRecord record = this.entityToRecordTransformer.transformEntity(entity);
        emitter.emit((Object)record);
    }

    private Schema getSchema(FailureCollector collector) {
        Iterator results;
        Query.Builder queryBuilder = Query.newBuilder().addKind(KindExpression.newBuilder().setName(this.config.getKind()).build()).setLimit(Int32Value.of((int)1));
        Key ancestorKey = this.constructAncestorKey(this.config, collector);
        if (ancestorKey != null) {
            queryBuilder.setFilter(DatastoreHelper.makeAncestorFilter((Key)ancestorKey).build());
        }
        Query query = queryBuilder.build();
        LOG.debug("Executing query for `Get Schema`: {}", (Object)query);
        Datastore datastore = DatastoreUtil.getDatastoreV1(this.config.getServiceAccount(), this.config.isServiceAccountFilePath(), this.config.getProject());
        RunQueryRequest request = RunQueryRequest.newBuilder().setQuery(query).setPartitionId(PartitionId.newBuilder().setNamespaceId(this.config.getNamespace()).setProjectId(this.config.getProject())).build();
        try {
            results = datastore.runQuery(request).getBatch().getEntityResultsList().iterator();
        }
        catch (DatastoreException e) {
            collector.addFailure("Unable to fetch data from Datastore: " + e.getMessage(), null).withStacktrace(e.getStackTrace());
            throw collector.getOrThrowException();
        }
        if (results.hasNext()) {
            Entity entity = ((EntityResult)results.next()).getEntity();
            return this.constructSchema(entity, this.config.isIncludeKey(collector), this.config.getKeyAlias());
        }
        collector.addFailure("Cloud Datastore query did not return any results. ", "Ensure Namespace, Kind and Ancestor properties are correct.").withConfigProperty("namespace").withConfigProperty("kind").withConfigProperty("ancestor");
        throw collector.getOrThrowException();
    }

    @Nullable
    @VisibleForTesting
    Key constructAncestorKey(DatastoreSourceConfig config, FailureCollector collector) {
        List<Key.PathElement> ancestor = config.getAncestor(collector);
        if (ancestor.size() <= 1) {
            return null;
        }
        Key.PathElement keyElement = ancestor.get(ancestor.size() - 1);
        Key.Builder keyBuilder = Key.newBuilder().setPartitionId(PartitionId.newBuilder().setProjectId(config.getProject()).setNamespaceId(config.getNamespace()));
        ancestor.subList(0, ancestor.size() - 1).forEach(arg_0 -> ((Key.Builder)keyBuilder).addPath(arg_0));
        if (keyElement.getIdTypeCase() == Key.PathElement.IdTypeCase.ID) {
            keyBuilder.addPath(Key.PathElement.newBuilder().setId(keyElement.getId()).setKind(keyElement.getKind()).build());
        } else {
            keyBuilder.addPath(Key.PathElement.newBuilder().setName(keyElement.getName()).setKind(keyElement.getKind()).build());
        }
        return keyBuilder.build();
    }

    @VisibleForTesting
    Schema constructSchema(Entity entity, boolean isIncludeKey, String keyName) {
        List<Schema.Field> fields = this.constructSchemaFields(entity);
        if (isIncludeKey) {
            fields.add(Schema.Field.of((String)keyName, (Schema)Schema.of((Schema.Type)Schema.Type.STRING)));
        }
        return Schema.recordOf((String)"schema", fields);
    }

    private List<Schema.Field> constructSchemaFields(Entity entity) {
        return entity.getPropertiesMap().entrySet().stream().map(entry -> this.transformToField((String)entry.getKey(), (Value)entry.getValue())).filter(Objects::nonNull).collect(Collectors.toList());
    }

    private Schema.Field transformToField(String name, Value value) {
        Schema schema = this.createSchema(name, value);
        if (schema == null) {
            return null;
        }
        return Schema.Type.NULL == schema.getType() ? Schema.Field.of((String)name, (Schema)schema) : Schema.Field.of((String)name, (Schema)Schema.nullableOf((Schema)schema));
    }

    private Schema createSchema(String name, Value value) {
        Schema schema = SUPPORTED_SIMPLE_TYPES.get(value.getValueTypeCase());
        if (schema != null) {
            return schema;
        }
        switch (value.getValueTypeCase()) {
            case ENTITY_VALUE: {
                List<Schema.Field> fields = this.constructSchemaFields(value.getEntityValue());
                return Schema.recordOf((String)name, fields);
            }
            case ARRAY_VALUE: {
                List values = value.getArrayValue().getValuesList();
                HashSet<Schema> arraySchemas = new HashSet<Schema>();
                for (Value val : values) {
                    Schema valSchema = this.createSchema(name, val);
                    if (valSchema == null) {
                        return null;
                    }
                    arraySchemas.add(valSchema);
                }
                if (arraySchemas.isEmpty()) {
                    return Schema.arrayOf((Schema)Schema.of((Schema.Type)Schema.Type.NULL));
                }
                if (arraySchemas.size() == 1) {
                    Schema componentSchema = (Schema)arraySchemas.iterator().next();
                    return Schema.Type.NULL == componentSchema.getType() ? Schema.arrayOf((Schema)componentSchema) : Schema.arrayOf((Schema)Schema.nullableOf((Schema)componentSchema));
                }
                LOG.debug("Field '{}' has several schemas in array, add them as union of schemas plus {} schema for null values", (Object)name, (Object)Schema.Type.NULL);
                arraySchemas.add(Schema.of((Schema.Type)Schema.Type.NULL));
                return Schema.arrayOf((Schema)Schema.unionOf(arraySchemas));
            }
        }
        LOG.debug("Field '{}' is of unsupported type '{}', skipping field from the schema", (Object)name, (Object)value.getValueTypeCase());
        return null;
    }
}

