/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.streamer;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.exception.HoodieRecordCreationException;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.org.apache.spark.sql.avro.HoodieAvroDeserializer;
import org.apache.hudi.util.SparkKeyGenUtils;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.streamer.BaseErrorTableWriter;
import org.apache.hudi.utilities.streamer.ErrorEvent;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;

public class HoodieStreamerUtils {
    public static Option<JavaRDD<HoodieRecord>> createHoodieRecords(HoodieStreamer.Config cfg, TypedProperties props, Option<JavaRDD<GenericRecord>> avroRDDOptional, SchemaProvider schemaProvider, HoodieRecord.HoodieRecordType recordType, boolean autoGenerateRecordKeys, String instantTime, Option<BaseErrorTableWriter> errorTableWriter) {
        boolean shouldCombine = cfg.filterDupes != false || cfg.operation.equals((Object)WriteOperationType.UPSERT);
        boolean shouldErrorTable = errorTableWriter.isPresent() && props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_RECORD_CREATION.key(), HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_RECORD_CREATION.defaultValue());
        boolean useConsistentLogicalTimestamp = ConfigUtils.getBooleanWithAltKeys(props, KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
        Set<String> partitionColumns = HoodieStreamerUtils.getPartitionColumns(props);
        String payloadClassName = StringUtils.isNullOrEmpty(cfg.payloadClassName) ? HoodieRecordPayload.getAvroPayloadForMergeMode(cfg.recordMergeMode) : cfg.payloadClassName;
        return avroRDDOptional.map(avroRDD -> {
            JavaRDD records;
            SerializableSchema avroSchema = new SerializableSchema(schemaProvider.getTargetSchema());
            SerializableSchema processedAvroSchema = new SerializableSchema(HoodieStreamerUtils.isDropPartitionColumns(props) != false ? HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
            if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
                records = avroRDD.mapPartitions((FlatMapFunction & Serializable)genericRecordIterator -> {
                    if (autoGenerateRecordKeys) {
                        props.setProperty("_hoodie.record.key.gen.partition.id", String.valueOf(TaskContext.getPartitionId()));
                        props.setProperty("_hoodie.record.key.gen.instant.time", instantTime);
                    }
                    BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)HoodieSparkKeyGeneratorFactory.createKeyGenerator((TypedProperties)props);
                    return new CloseableMappingIterator<GenericRecord, Either>(ClosableIterator.wrap(genericRecordIterator), genRec -> {
                        try {
                            HoodieKey hoodieKey = new HoodieKey(builtinKeyGenerator.getRecordKey(genRec), builtinKeyGenerator.getPartitionPath(genRec));
                            GenericRecord gr = HoodieStreamerUtils.isDropPartitionColumns(props) != false ? HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
                            HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload((String)payloadClassName, (GenericRecord)gr, (Comparable)((Comparable)HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, useConsistentLogicalTimestamp))) : DataSourceUtils.createPayload((String)payloadClassName, (GenericRecord)gr);
                            return Either.left(new HoodieAvroRecord<HoodieRecordPayload>(hoodieKey, payload));
                        }
                        catch (Exception e) {
                            return HoodieStreamerUtils.generateErrorRecordOrThrowException(genRec, e, shouldErrorTable);
                        }
                    });
                });
            } else if (recordType == HoodieRecord.HoodieRecordType.SPARK) {
                records = avroRDD.mapPartitions((FlatMapFunction & Serializable)itr -> {
                    if (autoGenerateRecordKeys) {
                        props.setProperty("_hoodie.record.key.gen.partition.id", String.valueOf(TaskContext.getPartitionId()));
                        props.setProperty("_hoodie.record.key.gen.instant.time", instantTime);
                    }
                    BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)HoodieSparkKeyGeneratorFactory.createKeyGenerator((TypedProperties)props);
                    StructType baseStructType = AvroConversionUtils.convertAvroSchemaToStructType((Schema)processedAvroSchema.get());
                    StructType targetStructType = HoodieStreamerUtils.isDropPartitionColumns(props) != false ? AvroConversionUtils.convertAvroSchemaToStructType((Schema)HoodieAvroUtils.removeFields(processedAvroSchema.get(), partitionColumns)) : baseStructType;
                    HoodieAvroDeserializer deserializer = SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(processedAvroSchema.get(), (DataType)baseStructType);
                    return new CloseableMappingIterator<GenericRecord, Either>(ClosableIterator.wrap(itr), rec -> {
                        InternalRow row = (InternalRow)deserializer.deserialize(rec).get();
                        try {
                            String recordKey = builtinKeyGenerator.getRecordKey(row, baseStructType).toString();
                            String partitionPath = builtinKeyGenerator.getPartitionPath(row, baseStructType).toString();
                            return Either.left(new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath), (InternalRow)HoodieInternalRowUtils.getCachedUnsafeProjection((StructType)baseStructType, (StructType)targetStructType).apply(row), targetStructType, false));
                        }
                        catch (Exception e) {
                            return HoodieStreamerUtils.generateErrorRecordOrThrowException(rec, e, shouldErrorTable);
                        }
                    });
                });
            } else {
                throw new UnsupportedOperationException(recordType.name());
            }
            if (shouldErrorTable) {
                ((BaseErrorTableWriter)errorTableWriter.get()).addErrorEvents(records.filter(Either::isRight).map(Either::asRight).map((Function & Serializable)evStr -> new ErrorEvent<String>((String)evStr, ErrorEvent.ErrorReason.RECORD_CREATION)));
            }
            return records.filter(Either::isLeft).map(Either::asLeft);
        });
    }

    private static Either<HoodieRecord, String> generateErrorRecordOrThrowException(GenericRecord genRec, Exception e, boolean shouldErrorTable) {
        if (!shouldErrorTable) {
            if (e instanceof HoodieKeyException) {
                throw (HoodieKeyException)e;
            }
            if (e instanceof HoodieKeyGeneratorException) {
                throw (HoodieKeyGeneratorException)e;
            }
            throw new HoodieRecordCreationException("Failed to create Hoodie Record", e);
        }
        try {
            return Either.right(HoodieAvroUtils.safeAvroToJsonString(genRec));
        }
        catch (Exception ex) {
            throw new HoodieException("Failed to convert illegal record to json", ex);
        }
    }

    static Boolean isDropPartitionColumns(TypedProperties props) {
        return props.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS.key(), HoodieTableConfig.DROP_PARTITION_COLUMNS.defaultValue());
    }

    static Set<String> getPartitionColumns(TypedProperties props) {
        String partitionColumns = SparkKeyGenUtils.getPartitionColumns((TypedProperties)props);
        return Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
    }
}

