/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.spanner.source;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.BatchClient;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.BatchTransactionId;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Instance;
import com.google.cloud.spanner.InstanceConfig;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ReplicaInfo;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.Type;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.SourceInputFormatProvider;
import io.cdap.plugin.gcp.common.Schemas;
import io.cdap.plugin.gcp.spanner.common.SpannerUtil;
import io.cdap.plugin.gcp.spanner.source.ResultSetToRecordTransformer;
import io.cdap.plugin.gcp.spanner.source.SpannerInputFormat;
import io.cdap.plugin.gcp.spanner.source.SpannerSourceConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="batchsource")
@Name(value="Spanner")
@Description(value="Batch source to read from Cloud Spanner. Cloud Spanner is a fully managed, mission-critical, relational database service that offers transactional consistency at global scale, schemas, SQL (ANSI 2011 with extensions), and automatic, synchronous replication for high availability.")
@Metadata(properties={@MetadataProperty(key="connector", value="Spanner")})
public class SpannerSource
extends BatchSource<NullWritable, ResultSet, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(SpannerSource.class);
    private static final String TABLE_NAME = "TableName";
    private static final Statement.Builder SCHEMA_STATEMENT_BUILDER = Statement.newBuilder((String)String.format("SELECT  t.column_name,t.spanner_type, t.is_nullable FROM information_schema.columns AS t WHERE   t.table_catalog = ''  AND  t.table_schema = '' AND t.table_name = @%s", "TableName"));
    private static final String LIMIT = "limit";
    public static final String NAME = "Spanner";
    private final SpannerSourceConfig config;
    private Schema schema;
    private ResultSetToRecordTransformer transformer;

    public SpannerSource(SpannerSourceConfig config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector collector = stageConfigurer.getFailureCollector();
        this.config.validate(collector);
        Schema configuredSchema = this.config.getSchema(collector);
        if (!this.config.canConnect() || this.config.tryGetProject() == null || this.config.isServiceAccountFilePath().booleanValue() && this.config.autoServiceAccountUnavailable()) {
            stageConfigurer.setOutputSchema(configuredSchema);
            return;
        }
        try {
            Schema schema = this.getSchema(collector);
            if (configuredSchema == null) {
                stageConfigurer.setOutputSchema(schema);
                return;
            }
            Schemas.validateFieldsMatch(schema, configuredSchema, collector);
            stageConfigurer.setOutputSchema(configuredSchema);
        }
        catch (SpannerException e) {
            collector.addFailure("Unable to connect to spanner instance.", "Verify spanner configurations such as instance, database, table, project, etc.").withStacktrace(e.getStackTrace());
        }
    }

    public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
        String location;
        FailureCollector collector = batchSourceContext.getFailureCollector();
        this.config.validate(collector);
        Schema actualSchema = this.getSchema(collector);
        Schema configuredSchema = this.config.getSchema(collector);
        if (configuredSchema != null) {
            Schemas.validateFieldsMatch(actualSchema, configuredSchema, collector);
        }
        collector.getOrThrowException();
        String projectId = this.config.getProject();
        Configuration configuration = new Configuration();
        this.initializeConfig(configuration, projectId);
        try (Spanner spanner = SpannerUtil.getSpannerService(this.config.getServiceAccount(), this.config.isServiceAccountFilePath(), projectId);){
            BatchClient batchClient = spanner.getBatchClient(DatabaseId.of((String)projectId, (String)this.config.instance, (String)this.config.database));
            Timestamp logicalStartTimeMicros = Timestamp.ofTimeMicroseconds((long)TimeUnit.MILLISECONDS.toMicros(batchSourceContext.getLogicalStartTime()));
            BatchReadOnlyTransaction batchReadOnlyTransaction = batchClient.batchReadOnlyTransaction(TimestampBound.ofReadTimestamp((Timestamp)logicalStartTimeMicros));
            BatchTransactionId batchTransactionId = batchReadOnlyTransaction.getBatchTransactionId();
            String importQuery = Strings.isNullOrEmpty((String)this.config.importQuery) ? String.format("Select * from %s;", this.config.table) : this.config.importQuery;
            ArrayList partitions = new ArrayList(batchReadOnlyTransaction.partitionQuery(this.getPartitionOptions(), Statement.of((String)importQuery), new Options.QueryOption[0]));
            configuration.set("spanner.batch.transaction.id", this.getSerializedObjectString(batchTransactionId));
            configuration.set("partitions.list", this.getSerializedObjectString(partitions));
            Instance spannerInstance = spanner.getInstanceAdminClient().getInstance(this.config.instance);
            InstanceConfig instanceConfig = spanner.getInstanceAdminClient().getInstanceConfig(spannerInstance.getInstanceConfigId().getInstanceConfig());
            location = ((ReplicaInfo)instanceConfig.getReplicas().get(0)).getLocation();
        }
        Asset asset = Asset.builder((String)this.config.getReferenceName()).setFqn(this.config.getFQN()).setLocation(location).build();
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)batchSourceContext, asset);
        lineageRecorder.createExternalDataset(configuredSchema);
        batchSourceContext.setInput(Input.of((String)this.config.getReferenceName(), (InputFormatProvider)new SourceInputFormatProvider(SpannerInputFormat.class, configuration)));
        this.schema = batchSourceContext.getOutputSchema();
        if (this.schema != null && this.schema.getFields() != null) {
            lineageRecorder.recordRead("Read", "Read from Spanner table.", this.schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
        }
    }

    public void initialize(BatchRuntimeContext context) throws Exception {
        super.initialize(context);
        Schema configuredSchema = context.getOutputSchema();
        this.schema = configuredSchema == null ? this.getSchema(context.getFailureCollector()) : configuredSchema;
        this.transformer = new ResultSetToRecordTransformer(this.schema);
    }

    public void transform(KeyValue<NullWritable, ResultSet> input, Emitter<StructuredRecord> emitter) {
        emitter.emit((Object)this.transformer.transform((ResultSet)input.getValue()));
    }

    private void initializeConfig(Configuration configuration, String projectId) {
        this.setIfValueNotNull(configuration, "project.id", projectId);
        this.setIfValueNotNull(configuration, "service.account.type", this.config.isServiceAccountFilePath() != false ? "serviceFilePath" : "serviceJson");
        this.setIfValueNotNull(configuration, "service.account.path", this.config.getServiceAccount());
        this.setIfValueNotNull(configuration, "instance.id", this.config.instance);
        this.setIfValueNotNull(configuration, "database.name", this.config.database);
        this.setIfValueNotNull(configuration, "query", Strings.isNullOrEmpty((String)this.config.importQuery) ? String.format("Select * from %s;", this.config.table) : this.config.importQuery);
    }

    private void setIfValueNotNull(Configuration configuration, String key, String value) {
        if (value != null) {
            configuration.set(key, value);
        }
    }

    /*
     * Exception decompiling
     */
    private String getSerializedObjectString(Object object) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private PartitionOptions getPartitionOptions() {
        PartitionOptions.Builder builder = PartitionOptions.newBuilder();
        if (this.config.partitionSizeMB != null) {
            builder.setPartitionSizeBytes(this.config.partitionSizeMB * 1024L * 1024L);
        }
        if (this.config.maxPartitions != null) {
            builder.setMaxPartitions(this.config.maxPartitions.longValue());
        }
        return builder.build();
    }

    /*
     * Exception decompiling
     */
    private Schema getSchema(FailureCollector collector) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 4 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private Statement getStatementForOneRow(String importQuery) {
        String query;
        String regex = "^(?:[^;']|(?:'[^']+'))+ LIMIT +\\d+(.*)";
        Pattern pattern = Pattern.compile(regex, 10);
        if (pattern.matcher(importQuery).matches()) {
            int index = StringUtils.lastIndexOf((CharSequence)importQuery, (CharSequence)LIMIT);
            String substringToReplace = importQuery.substring(index);
            query = importQuery.replace(substringToReplace, "limit 1");
        } else {
            query = String.format("%s limit 1", importQuery);
        }
        return Statement.newBuilder((String)query).build();
    }

    @Nullable
    Schema parseSchemaFromSpannerType(Type spannerType, String columnName, FailureCollector collector) {
        Type.Code code = spannerType.getCode();
        if (code == Type.Code.ARRAY) {
            Type arrayElementType = spannerType.getArrayElementType();
            Type.Code arrayElementTypeCode = arrayElementType.getCode();
            switch (arrayElementTypeCode) {
                case BOOL: {
                    return Schema.arrayOf((Schema)Schema.of((Schema.Type)Schema.Type.BOOLEAN));
                }
                case INT64: {
                    return Schema.arrayOf((Schema)Schema.of((Schema.Type)Schema.Type.LONG));
                }
                case FLOAT64: {
                    return Schema.arrayOf((Schema)Schema.of((Schema.Type)Schema.Type.DOUBLE));
                }
                case STRING: {
                    return Schema.arrayOf((Schema)Schema.of((Schema.Type)Schema.Type.STRING));
                }
                case BYTES: {
                    return Schema.arrayOf((Schema)Schema.of((Schema.Type)Schema.Type.BYTES));
                }
                case TIMESTAMP: {
                    return Schema.arrayOf((Schema)Schema.of((Schema.LogicalType)Schema.LogicalType.TIMESTAMP_MICROS));
                }
                case DATE: {
                    return Schema.arrayOf((Schema)Schema.of((Schema.LogicalType)Schema.LogicalType.DATE));
                }
            }
            collector.addFailure(String.format("Column '%s' has unsupported type '%s'.", columnName, spannerType), null);
            return null;
        }
        switch (code) {
            case BOOL: {
                return Schema.of((Schema.Type)Schema.Type.BOOLEAN);
            }
            case INT64: {
                return Schema.of((Schema.Type)Schema.Type.LONG);
            }
            case FLOAT64: {
                return Schema.of((Schema.Type)Schema.Type.DOUBLE);
            }
            case STRING: {
                return Schema.of((Schema.Type)Schema.Type.STRING);
            }
            case BYTES: {
                return Schema.of((Schema.Type)Schema.Type.BYTES);
            }
            case TIMESTAMP: {
                return Schema.of((Schema.LogicalType)Schema.LogicalType.TIMESTAMP_MICROS);
            }
            case DATE: {
                return Schema.of((Schema.LogicalType)Schema.LogicalType.DATE);
            }
        }
        collector.addFailure(String.format("Column '%s' has unsupported type '%s'.", columnName, spannerType), null);
        return null;
    }

    private Map<String, Boolean> getFieldsNullability(DatabaseClient databaseClient) {
        Statement tableMetadataStatement = ((Statement.Builder)SCHEMA_STATEMENT_BUILDER.bind(TABLE_NAME).to(this.config.table)).build();
        HashMap<String, Boolean> nullableState = new HashMap<String, Boolean>();
        ResultSet resultSet = databaseClient.singleUse().executeQuery(tableMetadataStatement, new Options.QueryOption[0]);
        while (resultSet.next()) {
            String columnName = resultSet.getString("column_name");
            String nullable = resultSet.getString("is_nullable");
            nullableState.put(columnName, "YES".equals(nullable));
        }
        return nullableState;
    }
}

