/*
 * Decompiled with CFR 0.152.
 */
package smile.io;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.util.Utf8;
import smile.data.DataFrame;
import smile.data.Tuple;
import smile.data.measure.NominalScale;
import smile.data.type.DataType;
import smile.data.type.DataTypes;
import smile.data.type.StructField;
import smile.data.type.StructType;

public class Avro {
    private Schema schema;

    public Avro(Schema schema) {
        if (schema.getType() != Schema.Type.RECORD) {
            throw new IllegalArgumentException("The type of schema is not Record");
        }
        this.schema = schema;
    }

    public Avro(Path schemaFile) throws IOException {
        this.schema = new Schema.Parser().parse(Files.newInputStream(schemaFile, new OpenOption[0]));
        if (this.schema.getType() != Schema.Type.RECORD) {
            throw new IllegalArgumentException("The type of schema is not Record");
        }
    }

    public DataFrame read(Path path) throws IOException {
        return this.read(path, Integer.MAX_VALUE);
    }

    public DataFrame read(Path path, int limit) throws IOException {
        GenericDatumReader datumReader = new GenericDatumReader(this.schema);
        try (DataFileStream dataFileReader = new DataFileStream(Files.newInputStream(path, new OpenOption[0]), (DatumReader)datumReader);){
            StructType struct = this.toSmileSchema(this.schema);
            ArrayList<Tuple> rows = new ArrayList<Tuple>();
            GenericRecord record = null;
            while (dataFileReader.hasNext() && rows.size() < limit) {
                record = (GenericRecord)dataFileReader.next(record);
                Object[] row = new Object[struct.length()];
                for (int i = 0; i < row.length; ++i) {
                    row[i] = record.get(struct.field((int)i).name);
                    if (!(row[i] instanceof Utf8)) continue;
                    String str = row[i].toString();
                    row[i] = struct.field((int)i).measure.map(m -> m.valueOf(str)).orElse(str);
                }
                rows.add(Tuple.of((Object[])row, (StructType)struct));
            }
            DataFrame dataFrame = DataFrame.of(rows);
            return dataFrame;
        }
    }

    private StructType toSmileSchema(Schema schema) {
        ArrayList<StructField> fields = new ArrayList<StructField>();
        for (Schema.Field field : schema.getFields()) {
            NominalScale scale = null;
            if (field.schema().getType() == Schema.Type.ENUM) {
                scale = new NominalScale(field.schema().getEnumSymbols());
            }
            fields.add(new StructField(field.name(), this.typeOf(field.schema()), Optional.ofNullable(scale)));
        }
        return DataTypes.struct(fields);
    }

    private DataType typeOf(Schema schema) {
        switch (schema.getType()) {
            case BOOLEAN: {
                return DataTypes.BooleanType;
            }
            case INT: {
                return DataTypes.IntegerType;
            }
            case LONG: {
                return DataTypes.LongType;
            }
            case FLOAT: {
                return DataTypes.FloatType;
            }
            case DOUBLE: {
                return DataTypes.DoubleType;
            }
            case STRING: {
                return DataTypes.StringType;
            }
            case FIXED: 
            case BYTES: {
                return DataTypes.ByteArrayType;
            }
            case ENUM: {
                return new NominalScale(schema.getEnumSymbols()).type();
            }
            case ARRAY: {
                return DataTypes.array((DataType)this.typeOf(schema.getElementType()));
            }
            case MAP: {
                return DataTypes.object(Map.class);
            }
            case UNION: {
                return this.unionType(schema);
            }
        }
        throw new UnsupportedOperationException("Unsupported Avro type: " + schema);
    }

    private DataType unionType(Schema schema) {
        List union = schema.getTypes();
        if (union.isEmpty()) {
            throw new IllegalArgumentException("Empty type list of Union");
        }
        if (union.size() > 2) {
            String s = union.stream().map(t -> t.getType()).map(Object::toString).collect(Collectors.joining(", "));
            throw new UnsupportedOperationException(String.format("Unsupported type Union(%s)", s));
        }
        if (union.size() == 1) {
            return this.typeOf((Schema)union.get(0));
        }
        Schema a = (Schema)union.get(0);
        Schema b = (Schema)union.get(1);
        if (a.getType() == Schema.Type.NULL && b.getType() != Schema.Type.NULL) {
            return this.typeOf(b).boxed();
        }
        if (a.getType() != Schema.Type.NULL && b.getType() == Schema.Type.NULL) {
            return this.typeOf(a).boxed();
        }
        return DataTypes.object(Object.class);
    }
}

