/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.streamer;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.MercifulJsonConverter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.streamer.ErrorEvent;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public final class SourceFormatAdapter
implements Closeable {
    private final Source source;
    private boolean shouldSanitize = HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.defaultValue();
    private String invalidCharMask = HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue();
    private Option<BaseErrorTableWriter> errorTableWriter = Option.empty();

    public SourceFormatAdapter(Source source) {
        this(source, Option.empty(), Option.empty());
    }

    public SourceFormatAdapter(Source source, Option<BaseErrorTableWriter> errorTableWriter, Option<TypedProperties> props) {
        this.source = source;
        this.errorTableWriter = errorTableWriter;
        if (props.isPresent()) {
            this.shouldSanitize = SanitizationUtils.shouldSanitize(props.get());
            this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props.get());
        }
        if (this.shouldSanitize && source.getSourceType() == Source.SourceType.PROTO) {
            throw new IllegalArgumentException("PROTO cannot be sanitized");
        }
    }

    private boolean isFieldNameSanitizingEnabled() {
        return this.shouldSanitize;
    }

    private String getInvalidCharMask() {
        return this.invalidCharMask;
    }

    private JavaRDD<GenericRecord> transformJsonToGenericRdd(InputBatch<JavaRDD<String>> inputBatch) {
        MercifulJsonConverter.clearCache(inputBatch.getSchemaProvider().getSourceSchema().getFullName());
        AvroConvertor convertor = new AvroConvertor(inputBatch.getSchemaProvider().getSourceSchema(), this.isFieldNameSanitizingEnabled(), this.getInvalidCharMask());
        return inputBatch.getBatch().map(rdd -> {
            if (this.errorTableWriter.isPresent()) {
                JavaRDD javaRDD = rdd.map(convertor::fromJsonWithError);
                this.errorTableWriter.get().addErrorEvents(javaRDD.filter((Function & Serializable)x -> x.isRight()).map((Function & Serializable)x -> new ErrorEvent<Object>(x.right().get(), ErrorEvent.ErrorReason.JSON_AVRO_DESERIALIZATION_FAILURE)));
                return javaRDD.filter((Function & Serializable)x -> x.isLeft()).map((Function & Serializable)x -> (GenericRecord)x.left().get());
            }
            return rdd.map(convertor::fromJson);
        }).orElse(null);
    }

    public Option<Dataset<Row>> processErrorEvents(Option<Dataset<Row>> eventsRow, ErrorEvent.ErrorReason errorReason) {
        return eventsRow.map(dataset -> {
            if (this.errorTableWriter.isPresent() && Arrays.stream(dataset.columns()).collect(Collectors.toList()).contains(BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME)) {
                this.errorTableWriter.get().addErrorEvents(dataset.filter(new Column(BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME).isNotNull()).select(new Column[]{new Column(BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME)}).toJavaRDD().map((Function & Serializable)ev -> new ErrorEvent<String>(ev.getString(0), errorReason)));
                return dataset.filter(new Column(BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME).isNull()).drop(BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME);
            }
            return dataset;
        });
    }

    public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) {
        switch (this.source.getSourceType()) {
            case AVRO: {
                return this.source.fetchNext(lastCkptStr, sourceLimit);
            }
            case JSON: {
                InputBatch<JavaRDD<String>> r = this.source.fetchNext(lastCkptStr, sourceLimit);
                JavaRDD<GenericRecord> eventsRdd = this.transformJsonToGenericRdd(r);
                return new InputBatch<JavaRDD<GenericRecord>>(Option.ofNullable(eventsRdd), r.getCheckpointForNextBatch(), r.getSchemaProvider());
            }
            case ROW: {
                InputBatch r = this.source.fetchNext(lastCkptStr, sourceLimit);
                return new InputBatch<Object>(Option.ofNullable(r.getBatch().map(rdd -> {
                    SchemaProvider originalProvider = UtilHelpers.getOriginalSchemaProvider(r.getSchemaProvider());
                    return originalProvider instanceof FilebasedSchemaProvider || originalProvider instanceof SchemaRegistryProvider ? HoodieSparkUtils.createRdd((Dataset<Row>)rdd, "hoodie_source", "hoodie.source", true, Option.ofNullable(r.getSchemaProvider().getSourceSchema())).toJavaRDD() : HoodieSparkUtils.createRdd((Dataset<Row>)rdd, "hoodie_source", "hoodie.source", false, Option.empty()).toJavaRDD();
                }).orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
            }
            case PROTO: {
                InputBatch r = this.source.fetchNext(lastCkptStr, sourceLimit);
                AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
                return new InputBatch<Object>(Option.ofNullable(r.getBatch().map(rdd -> rdd.map(convertor::fromProtoMessage)).orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
            }
        }
        throw new IllegalArgumentException("Unknown source type (" + (Object)((Object)this.source.getSourceType()) + ")");
    }

    private InputBatch<Dataset<Row>> avroDataInRowFormat(InputBatch<JavaRDD<GenericRecord>> r) {
        Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
        return new InputBatch<Object>(Option.ofNullable(r.getBatch().map(rdd -> AvroConversionUtils.createDataFrame((RDD<GenericRecord>)JavaRDD.toRDD((JavaRDD)rdd), sourceSchema.toString(), this.source.getSparkSession())).orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
    }

    public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) {
        switch (this.source.getSourceType()) {
            case ROW: {
                InputBatch datasetInputBatch = this.source.fetchNext(lastCkptStr, sourceLimit);
                return new InputBatch<Dataset<Row>>(this.processErrorEvents(datasetInputBatch.getBatch(), ErrorEvent.ErrorReason.JSON_ROW_DESERIALIZATION_FAILURE), datasetInputBatch.getCheckpointForNextBatch(), datasetInputBatch.getSchemaProvider());
            }
            case AVRO: {
                InputBatch<JavaRDD<GenericRecord>> r = this.source.fetchNext(lastCkptStr, sourceLimit);
                return this.avroDataInRowFormat(r);
            }
            case JSON: {
                if (this.isFieldNameSanitizingEnabled()) {
                    InputBatch<JavaRDD<GenericRecord>> r = this.fetchNewDataInAvroFormat(lastCkptStr, sourceLimit);
                    return this.avroDataInRowFormat(r);
                }
                InputBatch r = this.source.fetchNext(lastCkptStr, sourceLimit);
                Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
                if (this.errorTableWriter.isPresent()) {
                    StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema).add(new StructField(BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME, DataTypes.StringType, true, Metadata.empty()));
                    StructType nullableStruct = dataType.asNullable();
                    Option<Dataset<Row>> dataset = r.getBatch().map(rdd -> this.source.getSparkSession().read().option("columnNameOfCorruptRecord", BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME).schema(nullableStruct).option("mode", "PERMISSIVE").json(rdd));
                    Option<Dataset<Row>> eventsDataset = this.processErrorEvents(dataset, ErrorEvent.ErrorReason.JSON_ROW_DESERIALIZATION_FAILURE);
                    return new InputBatch<Dataset<Row>>(eventsDataset, r.getCheckpointForNextBatch(), r.getSchemaProvider());
                }
                StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
                return new InputBatch<Object>(Option.ofNullable(r.getBatch().map(rdd -> this.source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
            }
            case PROTO: {
                InputBatch r = this.source.fetchNext(lastCkptStr, sourceLimit);
                Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
                AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
                return new InputBatch<Object>(Option.ofNullable(r.getBatch().map(rdd -> rdd.map(convertor::fromProtoMessage)).map(rdd -> AvroConversionUtils.createDataFrame((RDD<GenericRecord>)JavaRDD.toRDD((JavaRDD)rdd), sourceSchema.toString(), this.source.getSparkSession())).orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
            }
        }
        throw new IllegalArgumentException("Unknown source type (" + (Object)((Object)this.source.getSourceType()) + ")");
    }

    public Source getSource() {
        return this.source;
    }

    @Override
    public void close() {
        if (this.source instanceof Closeable) {
            try {
                ((Closeable)((Object)this.source)).close();
            }
            catch (IOException e) {
                throw new HoodieIOException(String.format("Failed to shutdown the source (%s)", this.source.getClass().getName()), e);
            }
        }
    }
}

