/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.bigtable.source;

import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.SourceInputFormatProvider;
import io.cdap.plugin.gcp.bigtable.common.HBaseColumn;
import io.cdap.plugin.gcp.bigtable.source.BigtableInputFormat;
import io.cdap.plugin.gcp.bigtable.source.BigtableSourceConfig;
import io.cdap.plugin.gcp.bigtable.source.HBaseResultToRecordTransformer;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="batchsource")
@Name(value="Bigtable")
@Description(value="This source reads data from Google Cloud Bigtable. Cloud Bigtable is Google's NoSQL Big Data database service.")
public final class BigtableSource
extends BatchSource<ImmutableBytesWritable, Result, StructuredRecord> {
    public static final String NAME = "Bigtable";
    private static final Logger LOG = LoggerFactory.getLogger(BigtableSource.class);
    private final BigtableSourceConfig config;
    private HBaseResultToRecordTransformer resultToRecordTransformer;

    public BigtableSource(BigtableSourceConfig config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer configurer) {
        super.configurePipeline(configurer);
        StageConfigurer stageConfigurer = configurer.getStageConfigurer();
        FailureCollector collector = stageConfigurer.getFailureCollector();
        this.config.validate(collector);
        Schema configuredSchema = this.config.getSchema(collector);
        if (configuredSchema != null && this.config.connectionParamsConfigured()) {
            try {
                Configuration conf = this.getConfiguration(collector);
                this.validateOutputSchema(conf, collector);
            }
            catch (IOException e) {
                LOG.warn("Failed to validate output schema", (Throwable)e);
            }
        }
        stageConfigurer.setOutputSchema(configuredSchema);
    }

    public void prepareRun(BatchSourceContext context) {
        FailureCollector collector = context.getFailureCollector();
        this.config.validate(collector);
        Schema configuredSchema = this.config.getSchema(collector);
        Configuration conf = null;
        try {
            conf = this.getConfiguration(collector);
        }
        catch (IOException e) {
            collector.addFailure(String.format("Failed to prepare configuration for job : %s", e.getMessage()), null).withConfigProperty("bigtableOptions").withStacktrace(e.getStackTrace());
            collector.getOrThrowException();
        }
        try {
            this.validateOutputSchema(conf, collector);
        }
        catch (IOException e) {
            collector.addFailure(String.format("Failed to connect to Bigtable : %s", e.getMessage()), null).withStacktrace(e.getStackTrace());
            collector.getOrThrowException();
        }
        this.emitLineage(context, configuredSchema);
        context.setInput(Input.of((String)this.config.referenceName, (InputFormatProvider)new SourceInputFormatProvider(BigtableInputFormat.class, conf)));
    }

    public void initialize(BatchRuntimeContext context) throws Exception {
        super.initialize(context);
        this.resultToRecordTransformer = new HBaseResultToRecordTransformer(context.getOutputSchema(), this.config.keyAlias, this.config.getColumnMappings());
    }

    public void transform(KeyValue<ImmutableBytesWritable, Result> input, Emitter<StructuredRecord> emitter) {
        try {
            StructuredRecord record = this.resultToRecordTransformer.transform((Result)input.getValue());
            emitter.emit((Object)record);
        }
        catch (Exception e) {
            switch (this.config.getErrorHandling()) {
                case SKIP: {
                    LOG.warn("Failed to process message, skipping it", (Throwable)e);
                    break;
                }
                case FAIL_PIPELINE: {
                    throw new RuntimeException("Failed to process message", e);
                }
                default: {
                    throw new IllegalStateException(String.format("Unknown error handling strategy '%s'", new Object[]{this.config.getErrorHandling()}));
                }
            }
        }
    }

    private Configuration getConfiguration(FailureCollector collector) throws IOException {
        Configuration conf = new Configuration();
        String serviceAccount = this.config.getServiceAccount();
        if (serviceAccount != null) {
            conf.setBoolean("google.bigtable.auth.service.account.enable", true);
            if (this.config.isServiceAccountFilePath().booleanValue()) {
                conf.set("google.bigtable.auth.json.keyfile", serviceAccount);
            } else {
                conf.set("google.bigtable.auth.json.value", serviceAccount);
            }
        }
        BigtableConfiguration.configure((Configuration)conf, (String)this.config.getProject(), (String)this.config.instance);
        conf.setBoolean("hbase.mapreduce.inputtable.shufflemaps", true);
        conf.set("hbase.mapreduce.inputtable", this.config.table);
        this.config.getBigtableOptions().forEach((arg_0, arg_1) -> ((Configuration)conf).set(arg_0, arg_1));
        Scan scan = this.getConfiguredScanForJob(collector);
        conf.set("hbase.mapreduce.scan", TableMapReduceUtil.convertScanToString((Scan)scan));
        return conf;
    }

    private void validateOutputSchema(Configuration configuration, FailureCollector collector) throws IOException {
        TableName tableName = TableName.valueOf((String)this.config.table);
        try (Connection connection = BigtableConfiguration.connect((Configuration)configuration);
             Table table = connection.getTable(tableName);){
            Set existingFamilies = table.getTableDescriptor().getFamiliesKeys().stream().map(Bytes::toString).collect(Collectors.toSet());
            for (HBaseColumn hBaseColumn : this.config.getRequestedColumns(collector)) {
                if (existingFamilies.contains(hBaseColumn.getFamily())) continue;
                Map<String, String> columnMappings = this.config.getColumnMappings();
                String key = hBaseColumn.getQualifiedName();
                collector.addFailure(String.format("Column family '%s' does not exist.", hBaseColumn.getFamily()), "Specify correct column family.").withConfigElement("columnMappings", ConfigUtil.getKVPair((String)key, (String)columnMappings.get(key), (String)"="));
            }
        }
    }

    private void emitLineage(BatchSourceContext context, Schema schema) {
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)context, this.config.referenceName);
        lineageRecorder.createExternalDataset(schema);
        List fields = Objects.requireNonNull(this.config.getSchema(context.getFailureCollector())).getFields();
        if (fields != null) {
            List fieldNames = fields.stream().map(Schema.Field::getName).collect(Collectors.toList());
            String operationDescription = String.format("Read from Bigtable. Project: '%s', Instance: '%s'. Table: '%s'", this.config.getProject(), this.config.instance, this.config.table);
            lineageRecorder.recordRead("Read", operationDescription, fieldNames);
        }
    }

    private Scan getConfiguredScanForJob(FailureCollector collector) {
        Scan s = new Scan();
        try {
            if (this.config.scanTimeRangeStart != null || this.config.scanTimeRangeStop != null) {
                long scanTimeRangeStart = (Long)ObjectUtils.defaultIfNull((Object)this.config.scanTimeRangeStart, (Object)0L);
                long scanTimeRangeStop = (Long)ObjectUtils.defaultIfNull((Object)this.config.scanTimeRangeStop, (Object)Long.MAX_VALUE);
                s.setTimeRange(scanTimeRangeStart, scanTimeRangeStop);
            }
        }
        catch (IOException e) {
            collector.addFailure(String.format("Unable to set time range configuration : %s", e.getMessage()), null).withConfigProperty("scanTimeRangeStart").withConfigProperty("scanTimeRangeStop").withStacktrace(e.getStackTrace());
        }
        s.setCacheBlocks(false);
        if (this.config.scanRowStart != null) {
            s.withStartRow(Bytes.toBytes((String)this.config.scanRowStart));
        }
        if (this.config.scanRowStop != null) {
            s.withStopRow(Bytes.toBytes((String)this.config.scanRowStop));
        }
        for (HBaseColumn hBaseColumn : this.config.getRequestedColumns(collector)) {
            s.addColumn(Bytes.toBytes((String)hBaseColumn.getFamily()), Bytes.toBytes((String)hBaseColumn.getQualifier()));
        }
        return s;
    }
}

