/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.spanner.common;

import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.auth.Credentials;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.encryption.EncryptionConfigs;
import com.google.cloud.spanner.spi.v1.SpannerInterceptorProvider;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.GeneratedMessageV3;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.Mutation;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ResultSet;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.gcp.common.GCPConnectorConfig;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.spanner.common.BytesCounter;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpannerUtil {
    private static final Logger LOG = LoggerFactory.getLogger(SpannerUtil.class);
    private static final Set<Schema.LogicalType> SUPPORTED_LOGICAL_TYPES = ImmutableSet.of((Object)Schema.LogicalType.DATE, (Object)Schema.LogicalType.TIMESTAMP_MICROS, (Object)Schema.LogicalType.DATETIME);
    private static final String TABLE_NAME = "TableName";
    private static final Statement.Builder SCHEMA_STATEMENT_BUILDER = Statement.newBuilder((String)String.format("SELECT  t.column_name,t.spanner_type, t.is_nullable FROM information_schema.columns AS t WHERE   t.table_catalog = ''  AND  t.table_schema = '' AND t.table_name = @%s", "TableName"));

    public static Spanner getSpannerService(String serviceAccount, boolean isServiceAccountFilePath, String projectId) throws IOException {
        SpannerOptions.Builder optionsBuilder = SpannerUtil.buildSpannerOptions(serviceAccount, isServiceAccountFilePath, projectId);
        return (Spanner)optionsBuilder.build().getService();
    }

    public static Spanner getSpannerService(GCPConnectorConfig config) throws IOException {
        return SpannerUtil.getSpannerService(config.getServiceAccount(), config.isServiceAccountFilePath(), config.getProject());
    }

    public static Spanner getSpannerServiceWithReadInterceptor(String serviceAccount, boolean isServiceAccountFilePath, String projectId, final BytesCounter counter) throws IOException {
        SpannerOptions.Builder optionsBuilder = SpannerUtil.buildSpannerOptions(serviceAccount, isServiceAccountFilePath, projectId);
        optionsBuilder.setInterceptorProvider((GrpcInterceptorProvider)SpannerInterceptorProvider.createDefault().with(new ClientInterceptor(){

            public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
                ClientCall call = next.newCall(method, callOptions);
                class InterceptedClientCall<ReqT, RespT>
                extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> {
                    final /* synthetic */ BytesCounter val$counter;

                    InterceptedClientCall(ClientCall<ReqT, RespT> clientCall) {
                        this.val$counter = clientCall;
                        super((ClientCall)call);
                    }

                    public void start(ClientCall.Listener<RespT> responseListener, Metadata headers) {
                        super.start((ClientCall.Listener)new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener){

                            public void onMessage(RespT message) {
                                if (message instanceof PartialResultSet) {
                                    PartialResultSet partialResultSet = (PartialResultSet)message;
                                    val$counter.increment(partialResultSet.getSerializedSize());
                                } else if (message instanceof ResultSet) {
                                    ResultSet resultSet = (ResultSet)message;
                                    val$counter.increment(resultSet.getSerializedSize());
                                }
                                super.onMessage(message);
                            }
                        }, headers);
                    }
                }
                return new InterceptedClientCall(call, counter);
            }
        }));
        return (Spanner)optionsBuilder.build().getService();
    }

    public static Spanner getSpannerServiceWithWriteInterceptor(String serviceAccount, boolean isServiceAccountFilePath, String projectId, final BytesCounter counter) throws IOException {
        SpannerOptions.Builder optionsBuilder = SpannerUtil.buildSpannerOptions(serviceAccount, isServiceAccountFilePath, projectId);
        optionsBuilder.setInterceptorProvider((GrpcInterceptorProvider)SpannerInterceptorProvider.createDefault().with(new ClientInterceptor(){

            public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
                ClientCall call = next.newCall(method, callOptions);
                class InterceptedClientCall<ReqT, RespT>
                extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> {
                    final /* synthetic */ BytesCounter val$counter;

                    InterceptedClientCall(ClientCall<ReqT, RespT> clientCall) {
                        this.val$counter = clientCall;
                        super((ClientCall)call);
                    }

                    public void sendMessage(ReqT message) {
                        if (message instanceof CommitRequest) {
                            CommitRequest commitRequest = (CommitRequest)message;
                            commitRequest.getMutationsList().stream().map(GeneratedMessageV3::getAllFields).map(Map::values).flatMap(Collection::stream).filter(Mutation.Write.class::isInstance).map(Mutation.Write.class::cast).map(Mutation.Write::getSerializedSize).forEach(this.val$counter::increment);
                        }
                        super.sendMessage(message);
                    }
                }
                return new InterceptedClientCall(call, counter);
            }
        }));
        return (Spanner)optionsBuilder.build().getService();
    }

    private static SpannerOptions.Builder buildSpannerOptions(String serviceAccount, boolean isServiceAccountFilePath, String projectId) throws IOException {
        SpannerOptions.Builder optionsBuilder = SpannerOptions.newBuilder();
        if (serviceAccount != null) {
            optionsBuilder.setCredentials((Credentials)GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath));
        }
        optionsBuilder.setProjectId(projectId);
        return optionsBuilder;
    }

    public static void validateSchema(Schema schema, Set<Schema.Type> supportedTypes, FailureCollector collector) {
        for (Schema.Field field : schema.getFields()) {
            Schema.Type type;
            Schema fieldSchema = field.getSchema();
            fieldSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema;
            Schema.LogicalType logicalType = fieldSchema.getLogicalType();
            if (logicalType != null && !SUPPORTED_LOGICAL_TYPES.contains(logicalType)) {
                collector.addFailure(String.format("Field '%s' is of unsupported type '%s'.", field.getName(), fieldSchema.getDisplayName()), "Change the type to be a date, timestamp or datetime.").withOutputSchemaField(field.getName());
            }
            if (logicalType != null || supportedTypes.contains(type = fieldSchema.getType())) continue;
            collector.addFailure(String.format("Field '%s' is of unsupported type '%s'.", field.getName(), fieldSchema.getDisplayName()), String.format("Supported types are: %s, date, datetime and timestamp.", supportedTypes.stream().map(t -> t.name().toLowerCase()).collect(Collectors.joining(", ")))).withOutputSchemaField(field.getName());
        }
    }

    public static String convertSchemaToCreateStatement(String tableName, String primaryKeys, Schema schema) {
        StringBuilder createStmt = new StringBuilder();
        createStmt.append("CREATE TABLE ").append(tableName).append(" (");
        for (Schema.Field field : schema.getFields()) {
            String spannerType;
            String name = field.getName();
            Schema fieldSchema = field.getSchema();
            fieldSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema;
            Schema.LogicalType logicalType = fieldSchema.getLogicalType();
            if (logicalType != null) {
                switch (logicalType) {
                    case DATE: {
                        spannerType = "DATE";
                        break;
                    }
                    case TIMESTAMP_MILLIS: 
                    case TIMESTAMP_MICROS: {
                        spannerType = "TIMESTAMP";
                        break;
                    }
                    case DATETIME: {
                        spannerType = "STRING(MAX)";
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Logical type '" + logicalType + "' is not supported");
                    }
                }
                SpannerUtil.addColumn(createStmt, name, field.getSchema().isNullable(), spannerType);
                continue;
            }
            Schema.Type type = fieldSchema.getType();
            switch (type) {
                case BOOLEAN: {
                    spannerType = "BOOL";
                    break;
                }
                case STRING: {
                    spannerType = "STRING(MAX)";
                    break;
                }
                case INT: 
                case LONG: {
                    spannerType = "INT64";
                    break;
                }
                case FLOAT: 
                case DOUBLE: {
                    spannerType = "FLOAT64";
                    break;
                }
                case BYTES: {
                    spannerType = "BYTES(MAX)";
                    break;
                }
                case ARRAY: {
                    Schema componentSchema = fieldSchema.getComponentSchema();
                    if (componentSchema == null) {
                        throw new IllegalStateException("Component schema of field '" + name + "' is null");
                    }
                    spannerType = SpannerUtil.getArrayType(componentSchema);
                    break;
                }
                default: {
                    throw new IllegalStateException(type.name() + " : Type currently not supported.");
                }
            }
            SpannerUtil.addColumn(createStmt, name, field.getSchema().isNullable(), spannerType);
        }
        createStmt.deleteCharAt(createStmt.length() - 1).deleteCharAt(createStmt.length() - 1).append(")");
        createStmt.append(" PRIMARY KEY (").append(primaryKeys).append(")");
        return createStmt.toString();
    }

    private static String getArrayType(Schema schema) {
        Schema componentSchema = schema.isNullable() ? schema.getNonNullable() : schema;
        Schema.LogicalType logicalType = componentSchema.getLogicalType();
        if (logicalType != null) {
            switch (logicalType) {
                case DATE: {
                    return "ARRAY<DATE>";
                }
                case TIMESTAMP_MILLIS: 
                case TIMESTAMP_MICROS: {
                    return "ARRAY<TIMESTAMP>";
                }
            }
            throw new IllegalStateException("Array of '" + logicalType + "' logical type currently not supported.");
        }
        Schema.Type type = componentSchema.getType();
        switch (type) {
            case BOOLEAN: {
                return "ARRAY<BOOL>";
            }
            case STRING: {
                return "ARRAY<STRING(MAX)>";
            }
            case INT: 
            case LONG: {
                return "ARRAY<INT64>";
            }
            case FLOAT: 
            case DOUBLE: {
                return "ARRAY<FLOAT64>";
            }
            case BYTES: {
                return "ARRAY<BYTES(MAX)>";
            }
        }
        throw new IllegalStateException("Array of '" + type.name() + "' type currently not supported.");
    }

    private static void addColumn(StringBuilder createStmt, String name, boolean isNullable, String spannerType) {
        createStmt.append(name).append(" ").append(spannerType);
        if (!isNullable) {
            createStmt.append(" NOT NULL");
        }
        createStmt.append(", ");
    }

    public static Schema getTableSchema(Spanner spanner, String projectId, String instance, String database, String table, FailureCollector collector) {
        Statement getTableSchemaStatement = ((Statement.Builder)SCHEMA_STATEMENT_BUILDER.bind(TABLE_NAME).to(table)).build();
        DatabaseClient databaseClient = spanner.getDatabaseClient(DatabaseId.of((String)projectId, (String)instance, (String)database));
        try (com.google.cloud.spanner.ResultSet resultSet = databaseClient.singleUse().executeQuery(getTableSchemaStatement, new Options.QueryOption[0]);){
            ArrayList<Schema.Field> schemaFields = new ArrayList<Schema.Field>();
            while (resultSet.next()) {
                String columnName = resultSet.getString("column_name");
                String spannerType = resultSet.getString("spanner_type");
                String nullable = resultSet.getString("is_nullable");
                boolean isNullable = "YES".equals(nullable);
                Schema typeSchema = SpannerUtil.parseSchemaFromSpannerTypeString(columnName, spannerType, collector);
                if (typeSchema == null) continue;
                Schema fieldSchema = isNullable ? Schema.nullableOf((Schema)typeSchema) : typeSchema;
                schemaFields.add(Schema.Field.of((String)columnName, (Schema)fieldSchema));
            }
            if (schemaFields.isEmpty() && !collector.getValidationFailures().isEmpty()) {
                collector.getOrThrowException();
            }
            Schema schema = Schema.recordOf((String)"outputSchema", schemaFields);
            return schema;
        }
    }

    @Nullable
    private static Schema parseSchemaFromSpannerTypeString(String columnName, String spannerType, FailureCollector collector) {
        if (spannerType.startsWith("ARRAY")) {
            if (spannerType.startsWith("ARRAY<STRING")) {
                return Schema.arrayOf((Schema)Schema.of((Schema.Type)Schema.Type.STRING));
            }
            if (spannerType.startsWith("ARRAY<BYTES")) {
                return Schema.arrayOf((Schema)Schema.of((Schema.Type)Schema.Type.BYTES));
            }
            switch (spannerType) {
                case "ARRAY<BOOL>": {
                    return Schema.arrayOf((Schema)Schema.of((Schema.Type)Schema.Type.BOOLEAN));
                }
                case "ARRAY<INT64>": {
                    return Schema.arrayOf((Schema)Schema.of((Schema.Type)Schema.Type.LONG));
                }
                case "ARRAY<FLOAT64>": {
                    return Schema.arrayOf((Schema)Schema.of((Schema.Type)Schema.Type.DOUBLE));
                }
                case "ARRAY<DATE>": {
                    return Schema.arrayOf((Schema)Schema.of((Schema.LogicalType)Schema.LogicalType.DATE));
                }
                case "ARRAY<TIMESTAMP>": {
                    return Schema.arrayOf((Schema)Schema.of((Schema.LogicalType)Schema.LogicalType.TIMESTAMP_MICROS));
                }
            }
            collector.addFailure(String.format("Column '%s' is of unsupported type 'array'.", columnName), null);
        } else {
            if (spannerType.startsWith("STRING")) {
                return Schema.of((Schema.Type)Schema.Type.STRING);
            }
            if (spannerType.startsWith("BYTES")) {
                return Schema.of((Schema.Type)Schema.Type.BYTES);
            }
            switch (Type.Code.valueOf((String)spannerType)) {
                case BOOL: {
                    return Schema.of((Schema.Type)Schema.Type.BOOLEAN);
                }
                case INT64: {
                    return Schema.of((Schema.Type)Schema.Type.LONG);
                }
                case FLOAT64: {
                    return Schema.of((Schema.Type)Schema.Type.DOUBLE);
                }
                case DATE: {
                    return Schema.of((Schema.LogicalType)Schema.LogicalType.DATE);
                }
                case TIMESTAMP: {
                    return Schema.of((Schema.LogicalType)Schema.LogicalType.TIMESTAMP_MICROS);
                }
            }
            collector.addFailure(String.format("Column '%s' has unsupported type '%s'.", columnName, spannerType), null);
        }
        return null;
    }

    public static void verifyPresenceOrCreateDatabaseAndTable(Configuration configuration) {
        String projectId = configuration.get("project.id");
        String instanceId = configuration.get("instance.id");
        String databaseName = configuration.get("database.name");
        String serviceAccountType = configuration.get("service.account.type");
        String serviceAccount = configuration.get("service.account.path");
        String tableName = configuration.get("table");
        String keys = configuration.get("keys");
        if (!configuration.getBoolean("is.preview.enabled", Boolean.FALSE.booleanValue())) {
            try (Spanner spanner = SpannerUtil.getSpannerService(serviceAccount, "serviceFilePath".equals(serviceAccountType), projectId);){
                Schema schema = Schema.parseJson((String)configuration.get("schema"));
                DatabaseAdminClient dbAdminClient = spanner.getDatabaseAdminClient();
                Database database = SpannerUtil.getOrCreateDatabase(configuration, dbAdminClient, projectId, instanceId, databaseName);
                DatabaseId db = DatabaseId.of((String)projectId, (String)instanceId, (String)databaseName);
                DatabaseClient dbClient = spanner.getDatabaseClient(db);
                boolean tableExists = SpannerUtil.isTablePresent(dbClient, tableName);
                if (!tableExists && schema == null) {
                    throw new IllegalArgumentException(String.format("Spanner table %s does not exist. To create it from the pipeline, schema must be provided", tableName));
                }
                if (!tableExists) {
                    SpannerUtil.createTable(database, schema, databaseName, instanceId, tableName, keys);
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Exception while trying to get Spanner service. ", e);
            }
            catch (InterruptedException e) {
                throw SpannerExceptionFactory.propagateInterrupt((InterruptedException)e);
            }
            catch (ExecutionException e) {
                throw SpannerExceptionFactory.asSpannerException((Throwable)e.getCause());
            }
        }
    }

    private static void createTable(Database database, Schema schema, String databaseName, String instance, String tableName, String keys) throws ExecutionException, InterruptedException {
        if (Strings.isNullOrEmpty((String)keys)) {
            throw new IllegalArgumentException(String.format("Spanner table %s does not exist. To create it from the pipeline, primary keys must be provided", tableName));
        }
        String createStmt = SpannerUtil.convertSchemaToCreateStatement(tableName, keys, schema);
        LOG.debug("Creating table with create statement: {} in database {} of instance {}", new Object[]{createStmt, databaseName, instance});
        OperationFuture op = database.updateDdl(Collections.singletonList(createStmt), null);
        op.get();
    }

    private static boolean isTablePresent(DatabaseClient dbClient, String table) {
        Statement statement = ((Statement.Builder)Statement.newBuilder((String)String.format("SELECT\n    t.table_name\nFROM\n    information_schema.tables AS t\nWHERE\n    t.table_catalog = '' AND t.table_schema = '' AND\n    t.table_name = @%s", TABLE_NAME)).bind(TABLE_NAME).to(table)).build();
        com.google.cloud.spanner.ResultSet resultSet = dbClient.singleUse().executeQuery(statement, new Options.QueryOption[0]);
        boolean tableExists = resultSet.next();
        resultSet.close();
        return tableExists;
    }

    @Nullable
    private static Database getDatabaseIfPresent(DatabaseAdminClient dbAdminClient, String instance, String databaseName) {
        Database database;
        block2: {
            database = null;
            try {
                database = dbAdminClient.getDatabase(instance, databaseName);
            }
            catch (SpannerException e) {
                if (e.getErrorCode() == ErrorCode.NOT_FOUND) break block2;
                throw e;
            }
        }
        return database;
    }

    private static Database getOrCreateDatabase(Configuration configuration, DatabaseAdminClient dbAdminClient, String project, String instance, String databaseName) throws ExecutionException, InterruptedException {
        Database database = SpannerUtil.getDatabaseIfPresent(dbAdminClient, instance, databaseName);
        if (database == null) {
            LOG.debug("Database not found. Creating database {} in instance {}.", (Object)databaseName, (Object)instance);
            Database.Builder dbBuilder = dbAdminClient.newDatabaseBuilder(DatabaseId.of((String)project, (String)instance, (String)databaseName));
            String cmekKeyName = configuration.get("cmek.key");
            if (cmekKeyName != null) {
                dbBuilder.setEncryptionConfig(EncryptionConfigs.customerManagedEncryption((String)cmekKeyName));
            }
            OperationFuture op = dbAdminClient.createDatabase(dbBuilder.build(), Collections.emptyList());
            try {
                database = (Database)op.get(120L, TimeUnit.SECONDS);
            }
            catch (ExecutionException e) {
                throw SpannerExceptionFactory.asSpannerException((Throwable)e.getCause());
            }
            catch (InterruptedException e) {
                throw SpannerExceptionFactory.propagateInterrupt((InterruptedException)e);
            }
            catch (TimeoutException e) {
                throw SpannerExceptionFactory.propagateTimeout((TimeoutException)e);
            }
        }
        return database;
    }
}

