/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.bigquery.sink;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.EncryptionConfiguration;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableSchema;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySinkConfig;
import io.cdap.plugin.gcp.bigquery.sink.AvroOutputFormat;
import io.cdap.plugin.gcp.bigquery.sink.Operation;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.GCPUtils;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public final class BigQuerySinkUtils {
    public static final String GS_PATH_FORMAT = "gs://%s/%s";
    private static final String TEMPORARY_BUCKET_FORMAT = "gs://%s/%s/input/%s-%s";
    private static final String DATETIME = "DATETIME";
    private static final String RECORD = "RECORD";
    private static final Gson GSON = new Gson();
    private static final Type LIST_OF_FIELD_TYPE = new TypeToken<ArrayList<Field>>(){}.getType();
    private static final String CRITERIA_TEMPLATE = "T.`%s` = S.`%s`";
    private static final String SOURCE_DATA_QUERY = "(SELECT * FROM (SELECT row_number() OVER (PARTITION BY %s%s) as rowid, * FROM %s) where rowid = 1)";
    private static final String UPDATE_QUERY = "UPDATE %s T SET %s FROM %s S WHERE %s";
    private static final String UPSERT_QUERY = "MERGE %s T USING %s S ON %s WHEN MATCHED THEN UPDATE SET %s WHEN NOT MATCHED THEN INSERT (%s) VALUES(%s)";
    private static final String INSERT_ONLY_UPSERT_QUERY = "MERGE %s T USING %s S ON %s WHEN NOT MATCHED THEN INSERT (%s) VALUES(%s)";
    private static final List<String> COMPARISON_OPERATORS = Arrays.asList("=", "<", ">", "<=", ">=", "!=", "<>", "LIKE", "NOT LIKE", "BETWEEN", "NOT BETWEEN", "IN", "NOT IN", "IS NULL", "IS NOT NULL", "IS TRUE", "IS NOT TRUE", "IS FALSE", "IS NOT FALSE");
    public static final String BYTES_PROCESSED_METRIC = "bytes.processed";

    public static void createResources(BigQuery bigQuery, Storage storage, DatasetId datasetId, String bucketName, @Nullable String location, @Nullable CryptoKeyName cmekKeyName) throws IOException {
        Dataset dataset = bigQuery.getDataset(datasetId, new BigQuery.DatasetOption[0]);
        Bucket bucket = storage.get(bucketName, new Storage.BucketGetOption[0]);
        BigQuerySinkUtils.createResources(bigQuery, dataset, datasetId, storage, bucket, bucketName, location, cmekKeyName);
    }

    public static void createResources(BigQuery bigQuery, @Nullable Dataset dataset, DatasetId datasetId, Storage storage, @Nullable Bucket bucket, String bucketName, @Nullable String location, @Nullable CryptoKeyName cmekKey) throws IOException {
        if (dataset == null && bucket == null) {
            BigQuerySinkUtils.createBucket(storage, bucketName, location, cmekKey, () -> String.format("Unable to create Cloud Storage bucket '%s'", bucketName));
            BigQuerySinkUtils.createDataset(bigQuery, datasetId, location, cmekKey, () -> String.format("Unable to create BigQuery dataset '%s.%s'", datasetId.getProject(), datasetId.getDataset()));
        } else if (bucket == null) {
            BigQuerySinkUtils.createBucket(storage, bucketName, dataset.getLocation(), cmekKey, () -> String.format("Unable to create Cloud Storage bucket '%s' in the same location ('%s') as BigQuery dataset '%s'. Please use a bucket that is in the same location as the dataset.", bucketName, dataset.getLocation(), datasetId.getProject() + "." + datasetId.getDataset()));
        } else if (dataset == null) {
            BigQuerySinkUtils.createDataset(bigQuery, datasetId, bucket.getLocation(), cmekKey, () -> String.format("Unable to create BigQuery dataset '%s' in the same location ('%s') as Cloud Storage bucket '%s'. Please use a bucket that is in a supported location.", datasetId, bucket.getLocation(), bucketName));
        }
    }

    private static void createDataset(BigQuery bigQuery, DatasetId dataset, @Nullable String location, @Nullable CryptoKeyName cmekKeyName, Supplier<String> errorMessage) throws IOException {
        block4: {
            DatasetInfo.Builder builder = DatasetInfo.newBuilder((DatasetId)dataset);
            if (location != null) {
                builder.setLocation(location);
            }
            if (cmekKeyName != null) {
                builder.setDefaultEncryptionConfiguration(EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKeyName.toString()).build());
            }
            try {
                bigQuery.create(builder.build(), new BigQuery.DatasetOption[0]);
            }
            catch (BigQueryException e) {
                if (e.getCode() == 409) break block4;
                throw new IOException(errorMessage.get(), e);
            }
        }
    }

    public static void createDatasetIfNotExists(BigQuery bigQuery, DatasetId datasetId, @Nullable String location, @Nullable CryptoKeyName cmekKeyName, Supplier<String> errorMessage) throws IOException {
        Dataset ds = bigQuery.getDataset(datasetId, new BigQuery.DatasetOption[0]);
        if (ds == null) {
            BigQuerySinkUtils.createDataset(bigQuery, datasetId, location, cmekKeyName, errorMessage);
        }
    }

    private static void createBucket(Storage storage, String bucket, @Nullable String location, @Nullable CryptoKeyName cmekKeyName, Supplier<String> errorMessage) throws IOException {
        block2: {
            try {
                GCPUtils.createBucket(storage, bucket, location, cmekKeyName);
            }
            catch (StorageException e) {
                if (e.getCode() == 409) break block2;
                throw new IOException(errorMessage.get(), e);
            }
        }
    }

    public static String configureBucket(Configuration baseConfiguration, @Nullable String bucket, String runId) {
        boolean deleteBucket = false;
        if (bucket == null) {
            bucket = runId;
            deleteBucket = true;
        }
        return BigQuerySinkUtils.configureBucket(baseConfiguration, bucket, runId, deleteBucket);
    }

    public static String configureBucket(Configuration baseConfiguration, String bucket, String runId, boolean deleteBucket) {
        if (deleteBucket) {
            baseConfiguration.setBoolean("fs.gs.bucket.delete.enable", true);
        }
        baseConfiguration.set("fs.default.name", String.format(GS_PATH_FORMAT, bucket, runId));
        baseConfiguration.setBoolean("fs.gs.impl.disable.cache", true);
        baseConfiguration.setBoolean("fs.gs.metadata.cache.enable", false);
        return bucket;
    }

    public static void configureOutput(Configuration configuration, DatasetId datasetId, String tableName, String gcsPath, List<BigQueryTableFieldSchema> fields) throws IOException {
        BigQueryTableSchema outputTableSchema = new BigQueryTableSchema();
        if (!fields.isEmpty()) {
            outputTableSchema.setFields(fields);
        }
        BigQueryFileFormat fileFormat = BigQuerySinkUtils.getFileFormat(fields);
        BigQueryOutputConfiguration.configure((Configuration)configuration, (String)String.format("%s:%s.%s", datasetId.getProject(), datasetId.getDataset(), tableName), (BigQueryTableSchema)outputTableSchema, (String)gcsPath, (BigQueryFileFormat)fileFormat, BigQuerySinkUtils.getOutputFormat(fileFormat));
    }

    public static void configureMultiSinkOutput(Configuration configuration, DatasetId datasetId, String tableName, String gcsPath, List<BigQueryTableFieldSchema> fields) throws IOException {
        BigQuerySinkUtils.configureOutput(configuration, datasetId, tableName, gcsPath, fields);
        configuration.set("cdap.bq.sink.operation", Operation.INSERT.name());
    }

    public static String getTemporaryGcsPath(String bucket, String pathPrefix, String tableName) {
        return String.format(TEMPORARY_BUCKET_FORMAT, bucket, pathPrefix, tableName, pathPrefix);
    }

    public static List<BigQueryTableFieldSchema> getBigQueryTableFieldsFromSchema(io.cdap.cdap.api.data.schema.Schema tableSchema) {
        List inputFields = Objects.requireNonNull(tableSchema.getFields(), "Schema must have fields");
        return inputFields.stream().map(BigQuerySinkUtils::generateTableFieldSchema).collect(Collectors.toList());
    }

    public static void relaxTableSchema(BigQuery bigquery, Table sourceTable, Table destinationTable) {
        FieldList sourceFields = sourceTable.getDefinition().getSchema().getFields();
        FieldList destinationFields = destinationTable.getDefinition().getSchema().getFields();
        BigQuerySinkUtils.relaxTableSchema(bigquery, destinationTable, (List<Field>)sourceFields, (List<Field>)destinationFields);
    }

    public static void relaxTableSchema(BigQuery bigquery, Table destinationTable, List<Field> sourceFields, List<Field> destinationFields) {
        List<Field> resultFieldsList = BigQuerySinkUtils.getRelaxedTableFields(sourceFields, destinationFields);
        Schema newSchema = Schema.of(resultFieldsList);
        bigquery.update((TableInfo)destinationTable.toBuilder().setDefinition(destinationTable.getDefinition().toBuilder().setSchema(newSchema).build()).build(), new BigQuery.TableOption[0]);
    }

    public static List<Field> getRelaxedTableFields(List<Field> sourceFields, List<Field> destinationFields) {
        Map<String, Field> destinationFieldMap = destinationFields.stream().collect(Collectors.toMap(Field::getName, x -> x));
        Map<String, Field> sourceFieldMap = sourceFields.stream().collect(Collectors.toMap(Field::getName, x -> x));
        List<Field> resultingFields = destinationFields.stream().filter(field -> !sourceFieldMap.containsKey(field.getName())).collect(Collectors.toList());
        sourceFieldMap.values().stream().map(sourceField -> {
            Field destinationField;
            String fieldName = sourceField.getName();
            if (destinationFieldMap.containsKey(fieldName) && (destinationField = (Field)destinationFieldMap.get(fieldName)).getMode() == Field.Mode.NULLABLE && sourceField.getMode() == Field.Mode.REQUIRED) {
                sourceField = sourceField.toBuilder().setMode(Field.Mode.NULLABLE).build();
            }
            return sourceField;
        }).forEach(resultingFields::add);
        return resultingFields;
    }

    private static BigQueryTableFieldSchema generateTableFieldSchema(Schema.Field field) {
        BigQueryTableFieldSchema fieldSchema = new BigQueryTableFieldSchema();
        fieldSchema.setName(field.getName());
        fieldSchema.setMode(BigQuerySinkUtils.getMode(field.getSchema()).name());
        LegacySQLTypeName type = BigQuerySinkUtils.getTableDataType(field.getSchema());
        fieldSchema.setType(type.name());
        if (type == LegacySQLTypeName.RECORD) {
            io.cdap.cdap.api.data.schema.Schema fieldCdapSchema = BigQueryUtil.getNonNullableSchema(field.getSchema());
            List schemaFields = Schema.Type.ARRAY == fieldCdapSchema.getType() ? Objects.requireNonNull(BigQueryUtil.getNonNullableSchema(fieldCdapSchema.getComponentSchema()).getFields()) : fieldCdapSchema.getFields();
            fieldSchema.setFields(Objects.requireNonNull(schemaFields).stream().map(BigQuerySinkUtils::generateTableFieldSchema).collect(Collectors.toList()));
        }
        return fieldSchema;
    }

    public static Schema convertCdapSchemaToBigQuerySchema(io.cdap.cdap.api.data.schema.Schema schema) {
        List inputFields = Objects.requireNonNull(schema.getFields(), "Schema must have fields");
        List fields = inputFields.stream().map(BigQuerySinkUtils::convertCdapFieldToBigQueryField).collect(Collectors.toList());
        return Schema.of(fields);
    }

    private static Field convertCdapFieldToBigQueryField(Schema.Field field) {
        Field.Builder fieldBuilder;
        String name = field.getName();
        LegacySQLTypeName type = BigQuerySinkUtils.getTableDataType(field.getSchema());
        Field.Mode mode = BigQuerySinkUtils.getMode(field.getSchema());
        if (type == LegacySQLTypeName.RECORD) {
            io.cdap.cdap.api.data.schema.Schema fieldCdapSchema = BigQueryUtil.getNonNullableSchema(field.getSchema());
            List schemaFields = Schema.Type.ARRAY == fieldCdapSchema.getType() ? Objects.requireNonNull(BigQueryUtil.getNonNullableSchema(fieldCdapSchema.getComponentSchema()).getFields()) : fieldCdapSchema.getFields();
            FieldList subFields = FieldList.of((Iterable)Objects.requireNonNull(schemaFields).stream().map(BigQuerySinkUtils::convertCdapFieldToBigQueryField).collect(Collectors.toList()));
            fieldBuilder = Field.newBuilder((String)name, (LegacySQLTypeName)type, (FieldList)subFields);
        } else {
            fieldBuilder = Field.newBuilder((String)name, (LegacySQLTypeName)type, (Field[])new Field[0]);
        }
        fieldBuilder.setMode(mode);
        if (type == LegacySQLTypeName.NUMERIC || type == LegacySQLTypeName.BIGNUMERIC) {
            io.cdap.cdap.api.data.schema.Schema decimalFieldSchema = BigQueryUtil.getNonNullableSchema(field.getSchema());
            fieldBuilder.setPrecision(Long.valueOf(decimalFieldSchema.getPrecision()));
            fieldBuilder.setScale(Long.valueOf(decimalFieldSchema.getScale()));
        }
        return fieldBuilder.build();
    }

    private static Field.Mode getMode(io.cdap.cdap.api.data.schema.Schema schema) {
        Schema.Type nonNullableType;
        boolean isNullable = schema.isNullable();
        Schema.Type type = nonNullableType = isNullable ? schema.getNonNullable().getType() : schema.getType();
        if (isNullable && nonNullableType != Schema.Type.ARRAY) {
            return Field.Mode.NULLABLE;
        }
        if (nonNullableType == Schema.Type.ARRAY) {
            return Field.Mode.REPEATED;
        }
        return Field.Mode.REQUIRED;
    }

    private static LegacySQLTypeName getTableDataType(io.cdap.cdap.api.data.schema.Schema schema) {
        Schema.LogicalType logicalType = (schema = BigQueryUtil.getNonNullableSchema(schema)).getLogicalType();
        if (logicalType != null) {
            switch (logicalType) {
                case DATE: {
                    return LegacySQLTypeName.DATE;
                }
                case TIME_MILLIS: 
                case TIME_MICROS: {
                    return LegacySQLTypeName.TIME;
                }
                case TIMESTAMP_MILLIS: 
                case TIMESTAMP_MICROS: {
                    return LegacySQLTypeName.TIMESTAMP;
                }
                case DECIMAL: {
                    if (schema.getScale() <= 9 && schema.getPrecision() <= 38 && schema.getPrecision() - schema.getScale() <= 29) {
                        return LegacySQLTypeName.NUMERIC;
                    }
                    return LegacySQLTypeName.BIGNUMERIC;
                }
                case DATETIME: {
                    return LegacySQLTypeName.DATETIME;
                }
            }
            throw new IllegalStateException("Unsupported type " + logicalType.getToken());
        }
        Schema.Type type = schema.getType();
        switch (type) {
            case INT: 
            case LONG: {
                return LegacySQLTypeName.INTEGER;
            }
            case STRING: {
                return LegacySQLTypeName.STRING;
            }
            case FLOAT: 
            case DOUBLE: {
                return LegacySQLTypeName.FLOAT;
            }
            case BOOLEAN: {
                return LegacySQLTypeName.BOOLEAN;
            }
            case BYTES: {
                return LegacySQLTypeName.BYTES;
            }
            case ARRAY: {
                return BigQuerySinkUtils.getTableDataType(schema.getComponentSchema());
            }
            case RECORD: {
                return LegacySQLTypeName.RECORD;
            }
        }
        throw new IllegalStateException("Unsupported type " + type);
    }

    private static BigQueryFileFormat getFileFormat(List<BigQueryTableFieldSchema> fields) {
        for (BigQueryTableFieldSchema field : fields) {
            if (DATETIME.equals(field.getType())) {
                return BigQueryFileFormat.NEWLINE_DELIMITED_JSON;
            }
            if (!RECORD.equals(field.getType()) || BigQuerySinkUtils.getFileFormat(field.getFields()) != BigQueryFileFormat.NEWLINE_DELIMITED_JSON) continue;
            return BigQueryFileFormat.NEWLINE_DELIMITED_JSON;
        }
        return BigQueryFileFormat.AVRO;
    }

    private static Class<? extends FileOutputFormat> getOutputFormat(BigQueryFileFormat fileFormat) {
        if (fileFormat == BigQueryFileFormat.NEWLINE_DELIMITED_JSON) {
            return TextOutputFormat.class;
        }
        return AvroOutputFormat.class;
    }

    public static String generateUpdateUpsertQuery(Operation operation, TableId sourceTableId, TableId destinationTableId, List<String> tableFieldsList, List<String> tableKeyList, List<String> orderedByList, String partitionFilter) {
        String source = String.format("`%s.%s.%s`", sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable());
        String destination = String.format("`%s.%s.%s`", destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable());
        String criteria = tableKeyList.stream().map(s -> String.format(CRITERIA_TEMPLATE, s, s)).collect(Collectors.joining(" AND "));
        criteria = partitionFilter != null ? String.format("(%s) AND %s", BigQuerySinkUtils.formatPartitionFilter(partitionFilter), criteria) : criteria;
        String fieldsForUpdate = tableFieldsList.stream().filter(s -> !tableKeyList.contains(s)).map(s -> String.format(CRITERIA_TEMPLATE, s, s)).collect(Collectors.joining(", "));
        orderedByList = orderedByList.stream().map(s -> s.trim()).map(s -> {
            StringBuilder sb = new StringBuilder("`").append(s.split(" ")[0]).append("` ");
            if (s.split(" ").length > 1) {
                sb.append(s.split(" ", 2)[1]);
            }
            return sb.toString();
        }).collect(Collectors.toList());
        String orderedBy = orderedByList.isEmpty() ? "" : " ORDER BY " + String.join((CharSequence)", ", orderedByList);
        String sourceTable = String.format(SOURCE_DATA_QUERY, "`" + String.join((CharSequence)"`, `", tableKeyList) + "`", orderedBy, source);
        switch (operation) {
            case UPDATE: {
                return String.format(UPDATE_QUERY, destination, fieldsForUpdate, sourceTable, criteria);
            }
            case UPSERT: {
                String insertFields = "`" + String.join((CharSequence)"`, `", tableFieldsList) + "`";
                if (fieldsForUpdate.isEmpty()) {
                    return String.format(INSERT_ONLY_UPSERT_QUERY, destination, sourceTable, criteria, insertFields, insertFields);
                }
                return String.format(UPSERT_QUERY, destination, sourceTable, criteria, fieldsForUpdate, insertFields, insertFields);
            }
        }
        return "";
    }

    private static String formatPartitionFilter(String partitionFilter) {
        CharSequence[] queryWords = partitionFilter.split(" ");
        int index = 0;
        for (String string : queryWords) {
            if (COMPARISON_OPERATORS.contains(string.toUpperCase())) {
                queryWords[index - 1] = ((String)queryWords[index - 1]).replace(queryWords[index - 1], "T." + (String)queryWords[index - 1]);
            }
            ++index;
        }
        return String.join((CharSequence)" ", queryWords);
    }

    public static List<BigQueryTableFieldSchema> getBigQueryTableFields(BigQuery bigQuery, String tableName, @Nullable io.cdap.cdap.api.data.schema.Schema tableSchema, boolean allowSchemaRelaxation, String datasetProject, String dataset, boolean isTruncateTableSet, FailureCollector collector) {
        if (tableSchema == null) {
            return Collections.emptyList();
        }
        TableId tableId = TableId.of((String)datasetProject, (String)dataset, (String)tableName);
        try {
            Table table = bigQuery.getTable(tableId, new BigQuery.TableOption[0]);
            if (table != null) {
                Schema bqSchema = table.getDefinition().getSchema();
                BigQuerySinkUtils.validateSchema(tableName, bqSchema, tableSchema, allowSchemaRelaxation, isTruncateTableSet, dataset, collector);
            }
        }
        catch (BigQueryException e) {
            collector.addFailure("Unable to get details about the BigQuery table: " + e.getMessage(), null).withConfigProperty("table");
            throw collector.getOrThrowException();
        }
        return BigQuerySinkUtils.getBigQueryTableFieldsFromSchema(tableSchema);
    }

    public static void validateSchema(String tableName, Schema bqSchema, @Nullable io.cdap.cdap.api.data.schema.Schema tableSchema, boolean allowSchemaRelaxation, boolean isTruncateTableSet, String dataset, FailureCollector collector) {
        if (bqSchema == null || bqSchema.getFields().isEmpty() || tableSchema == null) {
            return;
        }
        FieldList bqFields = bqSchema.getFields();
        List outputSchemaFields = Objects.requireNonNull(tableSchema.getFields());
        List<String> missingBQFields = BigQueryUtil.getSchemaMinusBqFields(outputSchemaFields, bqFields);
        if (allowSchemaRelaxation && !isTruncateTableSet) {
            List nonNullableFields = missingBQFields.stream().map(arg_0 -> ((io.cdap.cdap.api.data.schema.Schema)tableSchema).getField(arg_0)).filter(Objects::nonNull).filter(field -> !field.getSchema().isNullable()).map(Schema.Field::getName).collect(Collectors.toList());
            Iterator iterator = nonNullableFields.iterator();
            while (iterator.hasNext()) {
                String nonNullableField = (String)iterator.next();
                collector.addFailure(String.format("Required field '%s' does not exist in BigQuery table '%s.%s'.", nonNullableField, dataset, tableName), "Change the field to be nullable.").withInputSchemaField(nonNullableField).withOutputSchemaField(nonNullableField);
            }
        }
        if (!allowSchemaRelaxation) {
            for (String string : missingBQFields) {
                collector.addFailure(String.format("Field '%s' does not exist in BigQuery table '%s.%s'.", string, dataset, tableName), String.format("Remove '%s' from the input, or add a column to the BigQuery table.", string)).withInputSchemaField(string).withOutputSchemaField(string);
            }
            List<String> remainingBQFields = BigQueryUtil.getBqFieldsMinusSchema(bqFields, outputSchemaFields);
            for (String field2 : remainingBQFields) {
                Field.Mode mode = bqFields.get(field2).getMode();
                if (mode == null || mode == Field.Mode.NULLABLE) continue;
                collector.addFailure(String.format("Required Column '%s' is not present in the schema.", field2), String.format("Add '%s' to the schema.", field2));
            }
        }
        if (!allowSchemaRelaxation || !isTruncateTableSet) {
            for (Schema.Field field2 : tableSchema.getFields()) {
                String fieldName = field2.getName();
                if (missingBQFields.contains(fieldName)) continue;
                ValidationFailure failure = BigQueryUtil.validateFieldSchemaMatches(bqFields.get(field2.getName()), field2, dataset, tableName, AbstractBigQuerySinkConfig.SUPPORTED_TYPES, collector);
                if (failure != null) {
                    failure.withInputSchemaField(fieldName).withOutputSchemaField(fieldName);
                }
                BigQueryUtil.validateFieldModeMatches(bqFields.get(fieldName), field2, allowSchemaRelaxation, collector);
            }
        }
        collector.getOrThrowException();
    }

    public static void validateInsertSchema(Table table, @Nullable io.cdap.cdap.api.data.schema.Schema tableSchema, boolean allowSchemaRelaxation, boolean isTruncateTableSet, String dataset, FailureCollector collector) {
        Schema bqSchema = table.getDefinition().getSchema();
        if (bqSchema == null || bqSchema.getFields().isEmpty()) {
            return;
        }
        if (isTruncateTableSet || tableSchema == null) {
            return;
        }
        FieldList bqFields = bqSchema.getFields();
        List outputSchemaFields = Objects.requireNonNull(tableSchema.getFields());
        List<String> remainingBQFields = BigQueryUtil.getBqFieldsMinusSchema(bqFields, outputSchemaFields);
        for (String field : remainingBQFields) {
            if (bqFields.get(field).getMode() == Field.Mode.NULLABLE) continue;
            collector.addFailure(String.format("Required Column '%s' is not present in the schema.", field), String.format("Add '%s' to the schema.", field));
        }
        String tableName = table.getTableId().getTable();
        List<String> missingBQFields = BigQueryUtil.getSchemaMinusBqFields(outputSchemaFields, bqFields);
        for (Schema.Field field : tableSchema.getFields()) {
            String fieldName = field.getName();
            if (missingBQFields.contains(fieldName)) continue;
            ValidationFailure failure = BigQueryUtil.validateFieldSchemaMatches(bqFields.get(field.getName()), field, dataset, tableName, AbstractBigQuerySinkConfig.SUPPORTED_TYPES, collector);
            if (failure != null) {
                failure.withInputSchemaField(fieldName).withOutputSchemaField(fieldName);
            }
            BigQueryUtil.validateFieldModeMatches(bqFields.get(fieldName), field, allowSchemaRelaxation, collector);
        }
        collector.getOrThrowException();
    }

    public static Set<String> getDuplicatedFields(List<String> schemaFields) {
        HashSet<String> duplicatedFields = new HashSet<String>();
        HashSet<String> set = new HashSet<String>();
        for (String field : schemaFields) {
            if (set.add(field)) continue;
            duplicatedFields.add(field);
        }
        return duplicatedFields;
    }

    public static boolean isSupportedLogicalType(Schema.LogicalType logicalType) {
        if (logicalType != null) {
            return logicalType == Schema.LogicalType.DATE || logicalType == Schema.LogicalType.TIMESTAMP_MICROS || logicalType == Schema.LogicalType.TIMESTAMP_MILLIS || logicalType == Schema.LogicalType.DECIMAL;
        }
        return false;
    }

    public static Map<String, Integer> calculateDuplicates(List<String> values) {
        return values.stream().map(v -> v.split(" ")[0]).collect(Collectors.toMap(p -> p, p -> 1, (x, y) -> x + y));
    }

    public static void recordLineage(BatchSinkContext context, Asset asset, io.cdap.cdap.api.data.schema.Schema tableSchema, List<String> fieldNames, @Nullable String operationNameSuffix) {
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)context, asset);
        lineageRecorder.createExternalDataset(tableSchema);
        if (!fieldNames.isEmpty()) {
            String operationName = operationNameSuffix == null ? "Write" : "Write_To_" + operationNameSuffix;
            lineageRecorder.recordWrite(operationName, "Wrote to BigQuery table.", fieldNames);
        }
    }
}

