/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.source;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.HiveCatalogs;
import org.apache.iceberg.shaded.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.source.Reader;
import org.apache.iceberg.spark.source.StreamingWriter;
import org.apache.iceberg.spark.source.Writer;
import org.apache.iceberg.transforms.UnknownTransform;
import org.apache.iceberg.types.CheckCompatibility;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.StreamWriteSupport;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;

public class IcebergSource
implements DataSourceV2,
ReadSupport,
WriteSupport,
DataSourceRegister,
StreamWriteSupport {
    private SparkSession lazySpark = null;
    private Configuration lazyConf = null;

    public String shortName() {
        return "iceberg";
    }

    public DataSourceReader createReader(DataSourceOptions options) {
        Configuration conf = new Configuration(this.lazyBaseConf());
        Table table = this.getTableAndResolveHadoopConfiguration(options, conf);
        String caseSensitive = this.lazySparkSession().conf().get("spark.sql.caseSensitive", "true");
        return new Reader(table, Boolean.valueOf(caseSensitive), options);
    }

    public Optional<DataSourceWriter> createWriter(String jobId, StructType dsStruct, SaveMode mode, DataSourceOptions options) {
        Preconditions.checkArgument(mode == SaveMode.Append || mode == SaveMode.Overwrite, "Save mode %s is not supported", (Object)mode);
        Configuration conf = new Configuration(this.lazyBaseConf());
        Table table = this.getTableAndResolveHadoopConfiguration(options, conf);
        this.validateWriteSchema(table.schema(), dsStruct);
        this.validatePartitionTransforms(table.spec());
        String appId = this.lazySparkSession().sparkContext().applicationId();
        String wapId = this.lazySparkSession().conf().get("spark.wap.id", null);
        return Optional.of(new Writer(table, options, mode == SaveMode.Overwrite, appId, wapId));
    }

    public StreamWriter createStreamWriter(String runId, StructType dsStruct, OutputMode mode, DataSourceOptions options) {
        Preconditions.checkArgument(mode == OutputMode.Append() || mode == OutputMode.Complete(), "Output mode %s is not supported", (Object)mode);
        Configuration conf = new Configuration(this.lazyBaseConf());
        Table table = this.getTableAndResolveHadoopConfiguration(options, conf);
        this.validateWriteSchema(table.schema(), dsStruct);
        this.validatePartitionTransforms(table.spec());
        String queryId = this.lazySparkSession().sparkContext().getLocalProperty(StreamExecution.QUERY_ID_KEY());
        String appId = this.lazySparkSession().sparkContext().applicationId();
        return new StreamingWriter(table, options, queryId, mode, appId);
    }

    protected Table findTable(DataSourceOptions options, Configuration conf) {
        Optional path = options.get("path");
        Preconditions.checkArgument(path.isPresent(), "Cannot open table: path is not set");
        if (((String)path.get()).contains("/")) {
            HadoopTables tables = new HadoopTables(conf);
            return tables.load((String)path.get());
        }
        HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf);
        TableIdentifier tableIdentifier = TableIdentifier.parse((String)path.get());
        return hiveCatalog.loadTable(tableIdentifier);
    }

    private SparkSession lazySparkSession() {
        if (this.lazySpark == null) {
            this.lazySpark = SparkSession.builder().getOrCreate();
        }
        return this.lazySpark;
    }

    private Configuration lazyBaseConf() {
        if (this.lazyConf == null) {
            this.lazyConf = this.lazySparkSession().sparkContext().hadoopConfiguration();
        }
        return this.lazyConf;
    }

    private Table getTableAndResolveHadoopConfiguration(DataSourceOptions options, Configuration conf) {
        IcebergSource.mergeIcebergHadoopConfs(conf, options.asMap());
        Table table = this.findTable(options, conf);
        IcebergSource.mergeIcebergHadoopConfs(conf, table.properties());
        IcebergSource.mergeIcebergHadoopConfs(conf, options.asMap());
        return table;
    }

    private static void mergeIcebergHadoopConfs(Configuration baseConf, Map<String, String> options) {
        options.keySet().stream().filter(key -> key.startsWith("hadoop.")).forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), (String)options.get(key)));
    }

    private void validateWriteSchema(Schema tableSchema, StructType dsStruct) {
        Schema dsSchema = SparkSchemaUtil.convert(tableSchema, dsStruct);
        List<String> errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
        if (!errors.isEmpty()) {
            StringBuilder sb = new StringBuilder();
            sb.append("Cannot write incompatible dataset to table with schema:\n").append(tableSchema).append("\nProblems:");
            for (String error : errors) {
                sb.append("\n* ").append(error);
            }
            throw new IllegalArgumentException(sb.toString());
        }
    }

    private void validatePartitionTransforms(PartitionSpec spec) {
        if (spec.fields().stream().anyMatch(field -> field.transform() instanceof UnknownTransform)) {
            String unsupported = spec.fields().stream().map(PartitionField::transform).filter(transform -> transform instanceof UnknownTransform).map(Object::toString).collect(Collectors.joining(", "));
            throw new UnsupportedOperationException(String.format("Cannot write using unsupported transforms: %s", unsupported));
        }
    }
}

