/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import java.io.Closeable;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;

public final class SourceFormatAdapter
implements Closeable {
    private final Source source;

    public SourceFormatAdapter(Source source) {
        this.source = source;
    }

    public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) {
        switch (this.source.getSourceType()) {
            case AVRO: {
                return this.source.fetchNext(lastCkptStr, sourceLimit);
            }
            case JSON: {
                InputBatch r = this.source.fetchNext(lastCkptStr, sourceLimit);
                AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
                return new InputBatch<Object>(Option.ofNullable(r.getBatch().map(rdd -> rdd.map(convertor::fromJson)).orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
            }
            case ROW: {
                InputBatch r = this.source.fetchNext(lastCkptStr, sourceLimit);
                return new InputBatch<Object>(Option.ofNullable(r.getBatch().map(rdd -> {
                    SchemaProvider originalProvider = UtilHelpers.getOriginalSchemaProvider(r.getSchemaProvider());
                    return originalProvider instanceof FilebasedSchemaProvider ? HoodieSparkUtils.createRdd((Dataset<Row>)rdd, "hoodie_source", "hoodie.source", true, Option.ofNullable(r.getSchemaProvider().getSourceSchema())).toJavaRDD() : HoodieSparkUtils.createRdd((Dataset<Row>)rdd, "hoodie_source", "hoodie.source", false, Option.empty()).toJavaRDD();
                }).orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
            }
            case PROTO: {
                InputBatch r = this.source.fetchNext(lastCkptStr, sourceLimit);
                AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
                return new InputBatch<Object>(Option.ofNullable(r.getBatch().map(rdd -> rdd.map(convertor::fromProtoMessage)).orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
            }
        }
        throw new IllegalArgumentException("Unknown source type (" + (Object)((Object)this.source.getSourceType()) + ")");
    }

    public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) {
        switch (this.source.getSourceType()) {
            case ROW: {
                return this.source.fetchNext(lastCkptStr, sourceLimit);
            }
            case AVRO: {
                InputBatch r = this.source.fetchNext(lastCkptStr, sourceLimit);
                Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
                return new InputBatch<Object>(Option.ofNullable(r.getBatch().map(rdd -> AvroConversionUtils.createDataFrame((RDD<GenericRecord>)JavaRDD.toRDD((JavaRDD)rdd), sourceSchema.toString(), this.source.getSparkSession())).orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
            }
            case JSON: {
                InputBatch r = this.source.fetchNext(lastCkptStr, sourceLimit);
                Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
                StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
                return new InputBatch<Object>(Option.ofNullable(r.getBatch().map(rdd -> this.source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
            }
            case PROTO: {
                InputBatch r = this.source.fetchNext(lastCkptStr, sourceLimit);
                Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
                AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
                return new InputBatch<Object>(Option.ofNullable(r.getBatch().map(rdd -> rdd.map(convertor::fromProtoMessage)).map(rdd -> AvroConversionUtils.createDataFrame((RDD<GenericRecord>)JavaRDD.toRDD((JavaRDD)rdd), sourceSchema.toString(), this.source.getSparkSession())).orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
            }
        }
        throw new IllegalArgumentException("Unknown source type (" + (Object)((Object)this.source.getSourceType()) + ")");
    }

    public Source getSource() {
        return this.source;
    }

    @Override
    public void close() {
        if (this.source instanceof Closeable) {
            try {
                ((Closeable)((Object)this.source)).close();
            }
            catch (IOException e) {
                throw new HoodieIOException(String.format("Failed to shutdown the source (%s)", this.source.getClass().getName()), e);
            }
        }
    }
}

