/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.gcp.dataplex.source;

import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.dataplex.v1.DataplexServiceClient;
import com.google.cloud.dataplex.v1.Entity;
import com.google.cloud.dataplex.v1.EntityName;
import com.google.cloud.dataplex.v1.LakeName;
import com.google.cloud.dataplex.v1.MetadataServiceClient;
import com.google.cloud.dataplex.v1.StorageSystem;
import com.google.cloud.dataplex.v1.Task;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.validation.ValidatingInputFormat;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.batch.JobUtils;
import io.cdap.plugin.gcp.bigquery.source.BigQueryAvroToStructuredTransformer;
import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceUtils;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.GCPConnectorConfig;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.dataplex.common.util.DataplexUtil;
import io.cdap.plugin.gcp.dataplex.source.DataplexInputFormatProvider;
import io.cdap.plugin.gcp.dataplex.source.config.DataplexBatchSourceConfig;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="batchsource")
@Name(value="Dataplex")
@Description(value="Dataplex Source")
public class DataplexBatchSource
extends BatchSource<Object, Object, StructuredRecord> {
    public static final String NAME = "Dataplex";
    private static final String BQ_TEMP_BUCKET_NAME_PREFIX = "dataplex-bq-source-bucket-";
    private static final String BQ_TEMP_BUCKET_NAME_TEMPLATE = "dataplex-bq-source-bucket-%s";
    private static final String CONFIG_TEMPORARY_TABLE_NAME = "cdap.bq.source.temporary.table.name";
    private static final String GCS_TEMP_BUCKET_NAME = "dataplex-cdf-" + UUID.randomUUID();
    private static final String DATAPLEX_TASK_ARGS = "TASK_ARGS";
    private static final Logger LOG = LoggerFactory.getLogger(DataplexBatchSource.class);
    private static Entity entity;
    private static String dataset;
    private static String datasetProject;
    private static Schema outputSchema;
    private static String tableId;
    private final BigQueryAvroToStructuredTransformer transformer = new BigQueryAvroToStructuredTransformer();
    private final DataplexBatchSourceConfig config;
    private Configuration configuration;
    private String bucketPath;

    public DataplexBatchSource(DataplexBatchSourceConfig dataplexBatchSourceConfig) {
        this.config = dataplexBatchSourceConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        StageConfigurer configurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector collector = configurer.getFailureCollector();
        if (!this.config.getConnection().canConnect() || this.config.getServiceAccountType() == null || this.config.isServiceAccountFilePath().booleanValue() && this.config.autoServiceAccountUnavailable() || this.config.tryGetProject() == null) {
            this.config.setupValidatingInputFormat(pipelineConfigurer, collector, null);
            return;
        }
        GoogleCredentials credentials = this.config.validateAndGetServiceAccountCredentials(collector);
        collector.getOrThrowException();
        try {
            entity = this.config.getAndValidateEntityConfiguration(collector, credentials);
        }
        catch (IOException e) {
            collector.addFailure(e.getCause().getMessage(), "Please check credentials");
            return;
        }
        if (entity == null) {
            this.config.setupValidatingInputFormat(pipelineConfigurer, collector, null);
            return;
        }
        if (entity.getSystem().equals((Object)StorageSystem.BIGQUERY)) {
            this.getEntityValuesFromDataPathForBQEntities(entity.getDataPath());
            this.config.validateBigQueryDataset(collector, datasetProject, dataset, tableId);
            if (this.config.getSchema(collector) == null) {
                Schema configuredSchema = DataplexUtil.getTableSchema(entity.getSchema(), collector);
                configurer.setOutputSchema(configuredSchema);
            }
            return;
        }
        this.config.checkMetastoreForGCSEntity(collector, credentials);
        this.config.setupValidatingInputFormat(pipelineConfigurer, collector, entity);
    }

    public void prepareRun(BatchSourceContext context) throws Exception {
        FailureCollector collector = context.getFailureCollector();
        GoogleCredentials credentials = this.config.validateAndGetServiceAccountCredentials(collector);
        collector.getOrThrowException();
        entity = this.config.getAndValidateEntityConfiguration(collector, credentials);
        if (entity == null) {
            throw new IOException(String.format("Pipeline failed. Entity %s does not exist", this.config.getEntity()));
        }
        if (entity.getSystem().equals((Object)StorageSystem.BIGQUERY)) {
            this.getEntityValuesFromDataPathForBQEntities(entity.getDataPath());
            this.config.validateBigQueryDataset(collector, datasetProject, dataset, tableId);
            this.prepareRunBigQueryDataset(context);
        } else {
            this.config.checkMetastoreForGCSEntity(collector, credentials);
            this.prepareRunStorageBucket(context);
        }
    }

    private void getEntityValuesFromDataPathForBQEntities(String dataPath) {
        String[] entityValues = dataPath.split("/");
        if (entityValues.length >= 3) {
            dataset = entityValues[entityValues.length - 3];
            datasetProject = entityValues[1];
            tableId = entityValues[entityValues.length - 1];
        }
    }

    public void initialize(BatchRuntimeContext context) throws Exception {
        super.initialize(context);
        outputSchema = this.config.getSchema(context.getFailureCollector());
        try (MetadataServiceClient metadataServiceClient = DataplexUtil.getMetadataServiceClient(this.config.getCredentials(context.getFailureCollector()));){
            entity = metadataServiceClient.getEntity(EntityName.newBuilder().setProject(this.config.tryGetProject()).setLocation(this.config.getLocation()).setLake(this.config.getLake()).setZone(this.config.getZone()).setEntity(this.config.getEntity()).build());
        }
    }

    private void prepareRunBigQueryDataset(BatchSourceContext context) throws Exception {
        FailureCollector collector = context.getFailureCollector();
        outputSchema = DataplexUtil.getTableSchema(entity.getSchema(), collector);
        String serviceAccount = this.config.getServiceAccount();
        GoogleCredentials credentials = this.config.getCredentials(collector);
        BigQuery bigQuery = GCPUtils.getBigQuery(datasetProject, (Credentials)credentials);
        this.bucketPath = UUID.randomUUID().toString();
        this.configuration = BigQueryUtil.getBigQueryConfig(serviceAccount, this.config.getProject(), null, this.config.getServiceAccountType());
        String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), this.config.getLocation(), bigQuery.getDataset(DatasetId.of((String)datasetProject, (String)dataset), new BigQuery.DatasetOption[0]), null);
        String bucket = this.createBucket(this.configuration, this.config.getProject(), bigQuery, (Credentials)credentials, bucketName, this.bucketPath);
        this.configureServiceAccount(this.configuration, this.config.getConnection());
        this.configureBigQuerySource();
        String temporaryGcsPath = BigQuerySourceUtils.getTemporaryGcsPath(bucket, this.bucketPath, this.bucketPath);
        BigQuerySourceUtils.configureBigQueryInput(this.configuration, DatasetId.of((String)datasetProject, (String)dataset), tableId, temporaryGcsPath);
        this.configuration.set("dataplex.source.entity.type", entity.getSystem().toString());
        TableDefinition.Type sourceTableType = this.config.getSourceTableType(datasetProject, dataset, tableId);
        this.emitLineage(context, outputSchema, sourceTableType);
        context.setInput(Input.of((String)this.config.getReferenceName(BigQueryUtil.getFQN(datasetProject, dataset, tableId)), (InputFormatProvider)new DataplexInputFormatProvider(this.configuration)));
    }

    private void configureBigQuerySource() {
        if (this.config.getPartitionFrom() != null) {
            this.configuration.set("cdap.bq.source.partition.from.date", this.config.getPartitionFrom());
        }
        if (this.config.getPartitionTo() != null) {
            this.configuration.set("cdap.bq.source.partition.to.date", this.config.getPartitionTo());
        }
        if (this.config.getFilter() != null) {
            this.configuration.set("cdap.bq.source.filter", this.config.getFilter());
        }
    }

    private void emitLineage(BatchSourceContext context, Schema schema, TableDefinition.Type sourceTableType) {
        this.getEntityValuesFromDataPathForBQEntities(entity.getDataPath());
        String fqn = BigQueryUtil.getFQN(datasetProject, dataset, tableId);
        Asset asset = Asset.builder((String)this.config.getReferenceName(fqn)).setFqn(fqn).setLocation(this.config.getLocation()).build();
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)context, asset);
        lineageRecorder.createExternalDataset(schema);
        String type = "table";
        if (TableDefinition.Type.VIEW == sourceTableType) {
            type = "view";
        } else if (TableDefinition.Type.MATERIALIZED_VIEW == sourceTableType) {
            type = "materialized view";
        }
        if (schema.getFields() != null) {
            this.recordLineage(lineageRecorder, schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()), String.format("Read from BigQuery Entity %s '%s' from Dataplex.", type, tableId));
        }
    }

    private void prepareRunStorageBucket(BatchSourceContext context) throws InstantiationException, IOException, ExecutionException, InterruptedException {
        FailureCollector collector = context.getFailureCollector();
        Job job = JobUtils.createInstance();
        this.configuration = job.getConfiguration();
        Storage storage = GCPUtils.getStorage(this.config.getProject(), (Credentials)this.config.getCredentials(collector));
        this.createBucket(this.configuration, storage, this.config.getLocation(), GCS_TEMP_BUCKET_NAME);
        String outputLocation = "gs://" + GCS_TEMP_BUCKET_NAME;
        String query = this.formatQuery(entity, context.isPreviewEnabled());
        String taskId = this.createTask(outputLocation, query, collector);
        this.setConfigurationForDataplex(taskId);
        ValidatingInputFormat validatingInputFormat = this.config.getValidatingInputFormat(context);
        FileInputFormat.setInputDirRecursive((Job)job, (boolean)true);
        Schema schema = DataplexUtil.getTableSchema(entity.getSchema(), collector);
        Asset asset = Asset.builder((String)this.config.getReferenceName(entity.getDataPath())).setFqn(entity.getDataPath()).setLocation(this.config.getLocation()).build();
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext)context, asset);
        lineageRecorder.createExternalDataset(schema);
        if (schema != null && schema.getFields() != null) {
            this.recordLineage(lineageRecorder, schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()), "Read from GCS entity in Dataplex.");
        }
        for (Map.Entry<String, String> entry : this.config.getFileSystemProperties(outputLocation).entrySet()) {
            this.configuration.set(entry.getKey(), entry.getValue());
        }
        Path path = new Path(outputLocation);
        FileSystem pathFileSystem = FileSystem.get((URI)path.toUri(), (Configuration)this.configuration);
        FileStatus[] fileStatus = pathFileSystem.globStatus(path);
        if (fileStatus == null) {
            throw new IOException(String.format("Input path %s does not exist", path));
        }
        FileInputFormat.addInputPath((Job)job, (Path)path);
        Map inputFormatConfiguration = validatingInputFormat.getInputFormatConfiguration();
        for (Map.Entry propertyEntry : inputFormatConfiguration.entrySet()) {
            this.configuration.set((String)propertyEntry.getKey(), (String)propertyEntry.getValue());
        }
        this.configuration.set("dataplex.source.entity.type", entity.getSystem().toString());
        context.setInput(Input.of((String)this.config.getReferenceName(entity.getDataPath()), (InputFormatProvider)new DataplexInputFormatProvider(this.configuration)));
    }

    private void setConfigurationForDataplex(String taskId) {
        this.configuration.set("dataplex.task.id", taskId);
        this.configuration.set("dataplex.gcp.project.id", this.config.tryGetProject());
        this.configuration.set("dataplex.location.id", this.config.getLocation());
        this.configuration.set("dataplex.lake.id", this.config.getLake());
        this.configuration.set("cdap.gcs.auth.service.account.type.flag", this.config.getServiceAccountType());
        String serviceAccountFilePath = this.config.getServiceAccountFilePath() != null ? this.config.getServiceAccountFilePath() : "none";
        this.configuration.set("cdap.gcs.auth.service.account.type.filepath", serviceAccountFilePath);
    }

    private void recordLineage(LineageRecorder lineageRecorder, List<String> outputFields, String description) {
        lineageRecorder.recordRead("Read", description, outputFields);
    }

    public void onRunFinish(boolean succeeded, BatchSourceContext context) {
        if (entity.getSystem().equals((Object)StorageSystem.BIGQUERY)) {
            BigQuerySourceUtils.deleteGcsTemporaryDirectory(this.configuration, null, this.bucketPath);
            String temporaryTable = this.configuration.get(CONFIG_TEMPORARY_TABLE_NAME);
            GoogleCredentials credentials = this.config.getCredentials(context.getFailureCollector());
            BigQuery bigQuery = GCPUtils.getBigQuery(this.config.getProject(), (Credentials)credentials);
            bigQuery.delete(TableId.of((String)datasetProject, (String)dataset, (String)temporaryTable));
            LOG.debug("Deleted temporary table '{}'", (Object)temporaryTable);
        } else {
            Storage storage = GCPUtils.getStorage(this.config.tryGetProject(), (Credentials)this.config.getCredentials(context.getFailureCollector()));
            BigQuerySourceUtils.deleteGcsTemporaryDirectory(this.configuration, GCS_TEMP_BUCKET_NAME, "projects");
            storage.delete(GCS_TEMP_BUCKET_NAME, new Storage.BucketSourceOption[0]);
            LOG.debug("Deleted temporary bucket '{}'.", (Object)GCS_TEMP_BUCKET_NAME);
        }
    }

    private String createTask(String outputLocation, String query, FailureCollector collector) throws IOException, ExecutionException, InterruptedException {
        String taskArgs = "--output_location,%s, --output_format, %s";
        Task.TriggerSpec triggerSpec = Task.TriggerSpec.newBuilder().setType(Task.TriggerSpec.Type.ON_DEMAND).build();
        Task.ExecutionSpec executionSpec = Task.ExecutionSpec.newBuilder().setServiceAccount(this.config.getServiceAccountEmail()).putArgs(DATAPLEX_TASK_ARGS, String.format(taskArgs, outputLocation, "avro")).build();
        Task.SparkTaskConfig spark = Task.SparkTaskConfig.newBuilder().setSqlScript(query).build();
        Task task = Task.newBuilder().setTriggerSpec(triggerSpec).setDescription("task-" + UUID.randomUUID()).setExecutionSpec(executionSpec).setSpark(spark).build();
        try (DataplexServiceClient dataplexServiceClient = DataplexUtil.getDataplexServiceClient(this.config.getCredentials(collector));){
            task = (Task)dataplexServiceClient.createTaskAsync(LakeName.newBuilder().setLake(this.config.getLake()).setProject(this.config.tryGetProject()).setLocation(this.config.getLocation()).build(), task, task.getDescription()).get();
        }
        return task.getDescription();
    }

    private String formatQuery(Entity entity, boolean isPreviewEnabled) {
        String queryTemplate = "select * from %s.%s %s";
        StringBuilder condition = new StringBuilder();
        if (!Strings.isNullOrEmpty((String)this.config.getFilter())) {
            condition.append("where ").append(this.config.getFilter());
        }
        condition.append(isPreviewEnabled ? " LIMIT 1000;" : ";");
        return String.format(queryTemplate, this.config.getZone(), entity.getId(), condition);
    }

    public void transform(KeyValue<Object, Object> input, Emitter<StructuredRecord> emitter) throws IOException {
        if (entity.getSystem().equals((Object)StorageSystem.BIGQUERY)) {
            StructuredRecord transformed = outputSchema == null ? this.transformer.transform((GenericRecord)((GenericData.Record)input.getValue())) : this.transformer.transform((GenericRecord)((GenericData.Record)input.getValue()), outputSchema);
            emitter.emit((Object)transformed);
        } else {
            emitter.emit((Object)((StructuredRecord)input.getValue()));
        }
    }

    private String createBucket(Configuration configuration, String project, BigQuery bigQuery, Credentials credentials, @Nullable String bucket, String bucketPath) throws IOException {
        if (bucket == null) {
            bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, bucketPath);
            configuration.setBoolean("fs.gs.bucket.delete.enable", true);
        }
        Dataset bigQueryDataset = bigQuery.getDataset(DatasetId.of((String)datasetProject, (String)dataset), new BigQuery.DatasetOption[0]);
        this.createBucket(configuration, GCPUtils.getStorage(project, credentials), bigQueryDataset.getLocation(), bucket);
        return bucket;
    }

    private void configureServiceAccount(Configuration configuration, GCPConnectorConfig config) {
        if (config.getServiceAccount() != null) {
            configuration.set("cdap.bq.service.account", config.getServiceAccount());
            configuration.setBoolean("cdap.bq.service.account.isfile", config.isServiceAccountFilePath().booleanValue());
        }
    }

    private String createBucket(Configuration configuration, Storage storage, String location, @Nullable String bucket) throws IOException {
        if (storage != null && storage.get(bucket, new Storage.BucketGetOption[0]) == null) {
            try {
                configuration.setBoolean("fs.gs.bucket.delete.enable", true);
                GCPUtils.createBucket(storage, bucket, location, null);
            }
            catch (StorageException e) {
                if (e.getCode() == 409) {
                    return bucket;
                }
                throw new IOException(String.format("Unable to create Cloud Storage bucket '%s'. ", bucket), e);
            }
        }
        return bucket;
    }
}

