/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spark.bigquery.write.context;

import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.connector.common.BigQueryClient;
import com.google.cloud.bigquery.connector.common.BigQueryUtil;
import com.google.cloud.spark.bigquery.AvroSchemaConverter;
import com.google.cloud.spark.bigquery.PartitionOverwriteMode;
import com.google.cloud.spark.bigquery.SchemaConverters;
import com.google.cloud.spark.bigquery.SchemaConvertersConfiguration;
import com.google.cloud.spark.bigquery.SparkBigQueryConfig;
import com.google.cloud.spark.bigquery.SparkBigQueryUtil;
import com.google.cloud.spark.bigquery.metrics.SparkBigQueryConnectorMetricsUtils;
import com.google.cloud.spark.bigquery.write.BigQueryWriteHelper;
import com.google.cloud.spark.bigquery.write.IntermediateDataCleaner;
import com.google.cloud.spark.bigquery.write.context.BigQueryIndirectDataWriterContextFactory;
import com.google.cloud.spark.bigquery.write.context.BigQueryIndirectWriterCommitMessageContext;
import com.google.cloud.spark.bigquery.write.context.DataSourceWriterContext;
import com.google.cloud.spark.bigquery.write.context.DataWriterContextFactory;
import com.google.cloud.spark.bigquery.write.context.WriterCommitMessageContext;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryIndirectDataSourceWriterContext
implements DataSourceWriterContext {
    private static final Logger logger = LoggerFactory.getLogger(BigQueryIndirectDataSourceWriterContext.class);
    private final BigQueryClient bigQueryClient;
    private final SparkBigQueryConfig config;
    private final Configuration hadoopConfiguration;
    private final StructType sparkSchema;
    private final String writeUUID;
    private final SaveMode saveMode;
    private final Path gcsPath;
    private final Optional<IntermediateDataCleaner> intermediateDataCleaner;
    private Optional<TableInfo> tableInfo = Optional.empty();
    private Optional<TableId> temporaryTableId = Optional.empty();
    private final JobInfo.WriteDisposition writeDisposition;
    private final SparkContext sparkContext;

    public BigQueryIndirectDataSourceWriterContext(BigQueryClient bigQueryClient, SparkBigQueryConfig config, Configuration hadoopConfiguration, StructType sparkSchema, String writeUUID, SaveMode saveMode, Path gcsPath, Optional<IntermediateDataCleaner> intermediateDataCleaner, SparkContext sparkContext) {
        this.bigQueryClient = bigQueryClient;
        this.config = config;
        this.hadoopConfiguration = hadoopConfiguration;
        this.sparkSchema = sparkSchema;
        this.writeUUID = writeUUID;
        this.saveMode = saveMode;
        this.gcsPath = gcsPath;
        this.intermediateDataCleaner = intermediateDataCleaner;
        this.writeDisposition = SparkBigQueryUtil.saveModeToWriteDisposition(saveMode);
        this.sparkContext = sparkContext;
    }

    @Override
    public DataWriterContextFactory<InternalRow> createWriterContextFactory() {
        Schema avroSchema = AvroSchemaConverter.sparkSchemaToAvroSchema(this.sparkSchema);
        return new BigQueryIndirectDataWriterContextFactory(new SerializableConfiguration(this.hadoopConfiguration), this.gcsPath.toString(), this.sparkSchema, avroSchema.toString());
    }

    @Override
    public void commit(WriterCommitMessageContext[] messages) {
        logger.info("Data has been successfully written to GCS. Going to load {} files to BigQuery", (Object)messages.length);
        try {
            List<String> sourceUris = Stream.of(messages).map(msg -> ((BigQueryIndirectWriterCommitMessageContext)msg).getUri()).collect(Collectors.toList());
            com.google.cloud.bigquery.Schema schema = SchemaConverters.from(SchemaConvertersConfiguration.from(this.config)).toBigQuerySchema(this.sparkSchema);
            if (this.tableInfo.isPresent()) {
                schema = BigQueryUtil.adjustSchemaIfNeeded((com.google.cloud.bigquery.Schema)schema, (com.google.cloud.bigquery.Schema)this.tableInfo.get().getDefinition().getSchema(), (boolean)this.config.getLoadSchemaUpdateOptions().contains((Object)JobInfo.SchemaUpdateOption.ALLOW_FIELD_RELAXATION));
            }
            if (this.writeDisposition == JobInfo.WriteDisposition.WRITE_TRUNCATE && this.config.getPartitionOverwriteModeValue() == PartitionOverwriteMode.DYNAMIC && this.bigQueryClient.tableExists(this.config.getTableId())) {
                this.temporaryTableId = Optional.of(this.bigQueryClient.createTempTableAfterCheckingSchema(this.config.getTableId(), schema, this.config.getEnableModeCheckForSchemaFields()).getTableId());
                this.loadDataToBigQuery(sourceUris, schema);
                Job queryJob = this.bigQueryClient.overwriteDestinationWithTemporaryDynamicPartitons(this.temporaryTableId.get(), this.config.getTableId());
                this.bigQueryClient.waitForJob(queryJob);
            }
            this.loadDataToBigQuery(sourceUris, schema);
            this.updateMetadataIfNeeded();
            logger.info("Data has been successfully loaded to BigQuery");
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        finally {
            this.cleanTemporaryGcsPathIfNeeded();
        }
    }

    @Override
    public void abort(WriterCommitMessageContext[] messages) {
        try {
            logger.warn("Aborting write {} for table {}", (Object)this.writeUUID, (Object)BigQueryUtil.friendlyTableName((TableId)this.config.getTableId()));
        }
        finally {
            this.cleanTemporaryGcsPathIfNeeded();
        }
    }

    @Override
    public void setTableInfo(TableInfo tableInfo) {
        this.tableInfo = Optional.ofNullable(tableInfo);
    }

    void loadDataToBigQuery(List<String> sourceUris, com.google.cloud.bigquery.Schema schema) throws IOException {
        List<String> optimizedSourceUris = SparkBigQueryUtil.optimizeLoadUriListForSpark(sourceUris);
        TableId destinationTableId = this.temporaryTableId.orElse(this.config.getTableId());
        JobStatistics.LoadStatistics loadStatistics = this.bigQueryClient.loadDataIntoTable((BigQueryClient.LoadDataOptions)this.config, optimizedSourceUris, (FormatOptions)FormatOptions.avro(), this.writeDisposition, Optional.of(schema), destinationTableId);
        long currentTimeMillis = System.currentTimeMillis();
        SparkBigQueryConnectorMetricsUtils.postWriteSessionMetrics(currentTimeMillis, SparkBigQueryConfig.WriteMethod.INDIRECT, loadStatistics.getOutputBytes(), Optional.of(this.config.getIntermediateFormat()), this.sparkContext);
    }

    void updateMetadataIfNeeded() {
        BigQueryWriteHelper.updateTableMetadataIfNeeded(this.sparkSchema, this.config, this.bigQueryClient);
    }

    void cleanTemporaryGcsPathIfNeeded() {
        this.intermediateDataCleaner.ifPresent(cleaner -> cleaner.deletePath());
    }
}

