/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.data;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.avro.AvroSchemaVisitor;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.shaded.com.google.common.collect.MapMaker;
import org.apache.iceberg.shaded.org.apache.avro.LogicalType;
import org.apache.iceberg.shaded.org.apache.avro.LogicalTypes;
import org.apache.iceberg.shaded.org.apache.avro.Schema;
import org.apache.iceberg.shaded.org.apache.avro.io.DatumReader;
import org.apache.iceberg.shaded.org.apache.avro.io.Decoder;
import org.apache.iceberg.shaded.org.apache.avro.io.DecoderFactory;
import org.apache.iceberg.shaded.org.apache.avro.io.ResolvingDecoder;
import org.apache.iceberg.spark.data.SparkValueReaders;
import org.apache.spark.sql.catalyst.InternalRow;

public class SparkAvroReader
implements DatumReader<InternalRow> {
    private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>> DECODER_CACHES = ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
    private final Schema readSchema;
    private final ValueReader<InternalRow> reader;
    private Schema fileSchema = null;

    public SparkAvroReader(Schema readSchema) {
        this.readSchema = readSchema;
        this.reader = (ValueReader)AvroSchemaVisitor.visit(readSchema, new ReadBuilder());
    }

    @Override
    public void setSchema(Schema newFileSchema) {
        this.fileSchema = Schema.applyAliases(newFileSchema, this.readSchema);
    }

    @Override
    public InternalRow read(InternalRow reuse, Decoder decoder) throws IOException {
        ResolvingDecoder resolver = this.resolve(decoder);
        InternalRow row = this.reader.read(resolver, reuse);
        resolver.drain();
        return row;
    }

    private ResolvingDecoder resolve(Decoder decoder) throws IOException {
        Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
        Map fileSchemaToResolver = cache.computeIfAbsent(this.readSchema, k -> new HashMap());
        ResolvingDecoder resolver = (ResolvingDecoder)fileSchemaToResolver.get(this.fileSchema);
        if (resolver == null) {
            resolver = this.newResolver();
            fileSchemaToResolver.put(this.fileSchema, resolver);
        }
        resolver.configure(decoder);
        return resolver;
    }

    private ResolvingDecoder newResolver() {
        try {
            return DecoderFactory.get().resolvingDecoder(this.fileSchema, this.readSchema, null);
        }
        catch (IOException e) {
            throw new RuntimeIOException(e);
        }
    }

    private static class ReadBuilder
    extends AvroSchemaVisitor<ValueReader<?>> {
        private ReadBuilder() {
        }

        @Override
        public ValueReader<?> record(Schema record, List<String> names, List<ValueReader<?>> fields) {
            return SparkValueReaders.struct(fields);
        }

        @Override
        public ValueReader<?> union(Schema union, List<ValueReader<?>> options) {
            return ValueReaders.union(options);
        }

        @Override
        public ValueReader<?> array(Schema array, ValueReader<?> elementReader) {
            LogicalType logical = array.getLogicalType();
            if (logical != null && "map".equals(logical.getName())) {
                ValueReader<?>[] keyValueReaders = ((SparkValueReaders.StructReader)elementReader).readers();
                return SparkValueReaders.arrayMap(keyValueReaders[0], keyValueReaders[1]);
            }
            return SparkValueReaders.array(elementReader);
        }

        @Override
        public ValueReader<?> map(Schema map, ValueReader<?> valueReader) {
            return SparkValueReaders.map(SparkValueReaders.strings(), valueReader);
        }

        @Override
        public ValueReader<?> primitive(Schema primitive) {
            LogicalType logicalType = primitive.getLogicalType();
            if (logicalType != null) {
                switch (logicalType.getName()) {
                    case "date": {
                        return ValueReaders.ints();
                    }
                    case "timestamp-millis": {
                        ValueReader<Long> longs = ValueReaders.longs();
                        return (decoder, ignored) -> (Long)longs.read(decoder, null) * 1000L;
                    }
                    case "timestamp-micros": {
                        return ValueReaders.longs();
                    }
                    case "decimal": {
                        ValueReader<byte[]> inner;
                        switch (primitive.getType()) {
                            case FIXED: {
                                inner = ValueReaders.fixed(primitive.getFixedSize());
                                break;
                            }
                            case BYTES: {
                                inner = ValueReaders.bytes();
                                break;
                            }
                            default: {
                                throw new IllegalArgumentException("Invalid primitive type for decimal: " + (Object)((Object)primitive.getType()));
                            }
                        }
                        LogicalTypes.Decimal decimal = (LogicalTypes.Decimal)logicalType;
                        return SparkValueReaders.decimal(inner, decimal.getScale());
                    }
                    case "uuid": {
                        return SparkValueReaders.uuids();
                    }
                }
                throw new IllegalArgumentException("Unknown logical type: " + logicalType);
            }
            switch (primitive.getType()) {
                case NULL: {
                    return ValueReaders.nulls();
                }
                case BOOLEAN: {
                    return ValueReaders.booleans();
                }
                case INT: {
                    return ValueReaders.ints();
                }
                case LONG: {
                    return ValueReaders.longs();
                }
                case FLOAT: {
                    return ValueReaders.floats();
                }
                case DOUBLE: {
                    return ValueReaders.doubles();
                }
                case STRING: {
                    return SparkValueReaders.strings();
                }
                case FIXED: {
                    return ValueReaders.fixed(primitive.getFixedSize());
                }
                case BYTES: {
                    return ValueReaders.bytes();
                }
            }
            throw new IllegalArgumentException("Unsupported type: " + primitive);
        }
    }
}

