/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.bigtable.sink;

import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.common.collect.ImmutableSet;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.gcp.bigtable.common.HBaseColumn;
import io.cdap.plugin.gcp.bigtable.sink.BigtableOutputFormat;
import io.cdap.plugin.gcp.bigtable.sink.BigtableSinkConfig;
import io.cdap.plugin.gcp.bigtable.sink.RecordToHBaseMutationTransformer;
import io.cdap.plugin.gcp.common.SourceOutputFormatProvider;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="batchsink")
@Name(value="Bigtable")
@Description(value="This sink writes data to Google Cloud Bigtable. Cloud Bigtable is Google's NoSQL Big Data database service.")
public final class BigtableSink
extends BatchSink<StructuredRecord, ImmutableBytesWritable, Mutation> {
    private static final Logger LOG = LoggerFactory.getLogger(BigtableSink.class);
    public static final String NAME = "Bigtable";
    private static final Set<Schema.Type> SUPPORTED_FIELD_TYPES = ImmutableSet.of((Object)Schema.Type.BOOLEAN, (Object)Schema.Type.INT, (Object)Schema.Type.LONG, (Object)Schema.Type.FLOAT, (Object)Schema.Type.DOUBLE, (Object)Schema.Type.BYTES, (Object[])new Schema.Type[]{Schema.Type.STRING});
    private final BigtableSinkConfig config;
    private RecordToHBaseMutationTransformer transformer;

    public BigtableSink(BigtableSinkConfig config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer configurer) {
        super.configurePipeline(configurer);
        StageConfigurer stageConfigurer = configurer.getStageConfigurer();
        FailureCollector collector = stageConfigurer.getFailureCollector();
        this.config.validate(collector);
        Schema inputSchema = stageConfigurer.getInputSchema();
        if (inputSchema != null) {
            this.validateInputSchema(inputSchema, collector);
        }
        if (this.config.connectionParamsConfigured()) {
            Configuration conf = this.getConfiguration();
            try (Connection connection = BigtableConfiguration.connect((Configuration)conf);
                 Admin admin = connection.getAdmin();){
                TableName tableName = TableName.valueOf((String)this.config.table);
                if (admin.tableExists(tableName)) {
                    this.validateExistingTable(connection, tableName, collector);
                }
            }
            catch (IOException e) {
                LOG.warn("Failed to connect to BigTable.", (Throwable)e);
            }
        }
    }

    public void prepareRun(BatchSinkContext context) {
        FailureCollector collector = context.getFailureCollector();
        this.config.validate(collector);
        Configuration conf = this.getConfiguration();
        try (Connection connection = BigtableConfiguration.connect((Configuration)conf);
             Admin admin = connection.getAdmin();){
            TableName tableName = TableName.valueOf((String)this.config.table);
            if (admin.tableExists(tableName)) {
                this.validateExistingTable(connection, tableName, collector);
            } else {
                this.createTable(connection, tableName, collector);
            }
        }
        catch (IOException e) {
            collector.addFailure(String.format("Failed to connect to Bigtable : %s", e.getMessage()), null).withConfigProperty("bigtableOptions").withStacktrace(e.getStackTrace());
        }
        collector.getOrThrowException();
        this.emitLineage(context);
        context.addOutput(Output.of((String)this.config.getReferenceName(), (OutputFormatProvider)new SourceOutputFormatProvider(BigtableOutputFormat.class, conf)));
    }

    public void initialize(BatchRuntimeContext context) throws Exception {
        super.initialize(context);
        FailureCollector collector = context.getFailureCollector();
        Map<String, HBaseColumn> columnMappings = this.config.getColumnMappings(collector);
        this.transformer = new RecordToHBaseMutationTransformer(this.config.keyAlias, columnMappings);
    }

    public void transform(StructuredRecord record, Emitter<KeyValue<ImmutableBytesWritable, Mutation>> emitter) {
        Mutation mutation = this.transformer.transform(record);
        emitter.emit((Object)new KeyValue(null, (Object)mutation));
    }

    private Configuration getConfiguration() {
        Configuration conf = new Configuration();
        String serviceAccount = this.config.getServiceAccount();
        if (serviceAccount != null) {
            conf.setBoolean("google.bigtable.auth.service.account.enable", true);
            if (this.config.isServiceAccountFilePath().booleanValue()) {
                conf.set("google.bigtable.auth.json.keyfile", serviceAccount);
            } else {
                conf.set("google.bigtable.auth.json.value", serviceAccount);
            }
        }
        BigtableConfiguration.configure((Configuration)conf, (String)this.config.getProject(), (String)this.config.instance);
        conf.set("hbase.mapred.outputtable", this.config.table);
        this.config.getBigtableOptions().forEach((arg_0, arg_1) -> ((Configuration)conf).set(arg_0, arg_1));
        return conf;
    }

    private void validateInputSchema(Schema inputSchema, FailureCollector collector) {
        List fields;
        if (!this.config.containsMacro("keyAlias") && inputSchema.getField(this.config.keyAlias) == null) {
            collector.addFailure(String.format("Field '%s' declared as key alias does not exist in input schema.", this.config.keyAlias), "Specify input field name as key alias.").withConfigProperty("keyAlias");
        }
        if ((fields = inputSchema.getFields()) == null || fields.isEmpty()) {
            collector.addFailure("Input schema must contain fields.", null);
            throw collector.getOrThrowException();
        }
        HashSet<String> fieldNames = new HashSet<String>();
        for (Schema.Field field : fields) {
            Schema nonNullableSchema;
            fieldNames.add(field.getName());
            Schema schema = nonNullableSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema();
            if (SUPPORTED_FIELD_TYPES.contains(nonNullableSchema.getType()) && (nonNullableSchema.getLogicalType() == Schema.LogicalType.DATETIME || nonNullableSchema.getLogicalType() == null)) continue;
            String supportedTypes = SUPPORTED_FIELD_TYPES.stream().map(Enum::name).map(String::toLowerCase).collect(Collectors.joining(", "));
            String errorMessage = String.format("Field '%s' is of unsupported type '%s'.", field.getName(), nonNullableSchema.getDisplayName());
            collector.addFailure(errorMessage, String.format("Supported types are: datetime, %s.", supportedTypes)).withInputSchemaField(field.getName());
        }
        this.config.getColumnMappings(collector).keySet().forEach(column -> {
            if (!fieldNames.contains(column)) {
                collector.addFailure(String.format("Column '%s' in column mappings does not exist in the input schema.", column), String.format("Remove or modify column '%s' from column mappings.", column));
            }
        });
    }

    private void createTable(Connection connection, TableName tableName, FailureCollector collector) {
        try (Admin admin = connection.getAdmin();){
            HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
            this.config.getColumnMappings(collector).values().stream().map(HBaseColumn::getFamily).distinct().map(HColumnDescriptor::new).forEach(arg_0 -> ((HTableDescriptor)tableDescriptor).addFamily(arg_0));
            admin.createTable(tableDescriptor);
        }
        catch (IOException e) {
            collector.addFailure(String.format("Failed to create table '%s' in Bigtable : %s", tableName, e.getMessage()), null).withConfigProperty("table").withStacktrace(e.getStackTrace());
        }
    }

    private void validateExistingTable(Connection connection, TableName tableName, FailureCollector collector) throws IOException {
        try (Table table = connection.getTable(tableName);){
            Set existingFamilies = table.getTableDescriptor().getFamiliesKeys().stream().map(Bytes::toString).collect(Collectors.toSet());
            for (Map.Entry<String, HBaseColumn> entry : this.config.getColumnMappings(collector).entrySet()) {
                String family = entry.getValue().getFamily();
                if (existingFamilies.contains(family)) continue;
                collector.addFailure(String.format("Column family '%s' does not exist in target table '%s'.", family, this.config.table), String.format("Remove column family %s.", family)).withConfigElement("columnMappings", ConfigUtil.getKVPair((String)entry.getKey(), (String)entry.getValue().getQualifiedName(), (String)"="));
            }
        }
    }

    private void emitLineage(BatchSinkContext context) {
        List fields;
        Schema inputSchema = context.getInputSchema();
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)context, this.config.getReferenceName());
        lineageRecorder.createExternalDataset(inputSchema);
        if (inputSchema != null && (fields = inputSchema.getFields()) != null) {
            List fieldNames = fields.stream().map(Schema.Field::getName).collect(Collectors.toList());
            String operationDescription = String.format("Wrote to Bigtable. Project: '%s', Instance: '%s'. Table: '%s'", this.config.getProject(), this.config.instance, this.config.table);
            lineageRecorder.recordWrite("Write", operationDescription, fieldNames);
        }
    }
}

