/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.data;

import java.util.Deque;
import java.util.List;
import java.util.stream.Stream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.orc.GenericOrcWriters;
import org.apache.iceberg.flink.data.FlinkOrcWriters;
import org.apache.iceberg.flink.data.FlinkSchemaVisitor;
import org.apache.iceberg.orc.OrcRowWriter;
import org.apache.iceberg.orc.OrcValueWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.shaded.org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

public class FlinkOrcWriter
implements OrcRowWriter<RowData> {
    private final FlinkOrcWriters.RowDataWriter writer;

    private FlinkOrcWriter(RowType rowType, Schema iSchema) {
        this.writer = (FlinkOrcWriters.RowDataWriter)FlinkSchemaVisitor.visit(rowType, iSchema, new WriteBuilder());
    }

    public static OrcRowWriter<RowData> buildWriter(RowType rowType, Schema iSchema) {
        return new FlinkOrcWriter(rowType, iSchema);
    }

    @Override
    public void write(RowData row, VectorizedRowBatch output) {
        Preconditions.checkArgument(row != null, "value must not be null");
        this.writer.writeRow(row, output);
    }

    @Override
    public List<OrcValueWriter<?>> writers() {
        return this.writer.writers();
    }

    @Override
    public Stream<FieldMetrics<?>> metrics() {
        return this.writer.metrics();
    }

    private static class WriteBuilder
    extends FlinkSchemaVisitor<OrcValueWriter<?>> {
        private final Deque<Integer> fieldIds = Lists.newLinkedList();

        private WriteBuilder() {
        }

        @Override
        public void beforeField(Types.NestedField field) {
            this.fieldIds.push(field.fieldId());
        }

        @Override
        public void afterField(Types.NestedField field) {
            this.fieldIds.pop();
        }

        @Override
        public OrcValueWriter<RowData> record(Types.StructType iStruct, List<OrcValueWriter<?>> results, List<LogicalType> fieldType) {
            return FlinkOrcWriters.struct(results, fieldType);
        }

        @Override
        public OrcValueWriter<?> map(Types.MapType iMap, OrcValueWriter<?> key, OrcValueWriter<?> value, LogicalType keyType, LogicalType valueType) {
            return FlinkOrcWriters.map(key, value, keyType, valueType);
        }

        @Override
        public OrcValueWriter<?> list(Types.ListType iList, OrcValueWriter<?> element, LogicalType elementType) {
            return FlinkOrcWriters.list(element, elementType);
        }

        @Override
        public OrcValueWriter<?> primitive(Type.PrimitiveType iPrimitive, LogicalType flinkPrimitive) {
            switch (iPrimitive.typeId()) {
                case BOOLEAN: {
                    return GenericOrcWriters.booleans();
                }
                case INTEGER: {
                    switch (flinkPrimitive.getTypeRoot()) {
                        case TINYINT: {
                            return GenericOrcWriters.bytes();
                        }
                        case SMALLINT: {
                            return GenericOrcWriters.shorts();
                        }
                    }
                    return GenericOrcWriters.ints();
                }
                case LONG: {
                    return GenericOrcWriters.longs();
                }
                case FLOAT: {
                    Preconditions.checkArgument(this.fieldIds.peek() != null, String.format("[BUG] Cannot find field id for primitive field with type %s. This is likely because id information is not properly pushed during schema visiting.", iPrimitive));
                    return GenericOrcWriters.floats(this.fieldIds.peek());
                }
                case DOUBLE: {
                    Preconditions.checkArgument(this.fieldIds.peek() != null, String.format("[BUG] Cannot find field id for primitive field with type %s. This is likely because id information is not properly pushed during schema visiting.", iPrimitive));
                    return GenericOrcWriters.doubles(this.fieldIds.peek());
                }
                case DATE: {
                    return FlinkOrcWriters.dates();
                }
                case TIME: {
                    return FlinkOrcWriters.times();
                }
                case TIMESTAMP: {
                    Types.TimestampType timestampType = (Types.TimestampType)iPrimitive;
                    if (timestampType.shouldAdjustToUTC()) {
                        return FlinkOrcWriters.timestampTzs();
                    }
                    return FlinkOrcWriters.timestamps();
                }
                case STRING: {
                    return FlinkOrcWriters.strings();
                }
                case UUID: 
                case FIXED: 
                case BINARY: {
                    return GenericOrcWriters.byteArrays();
                }
                case DECIMAL: {
                    Types.DecimalType decimalType = (Types.DecimalType)iPrimitive;
                    return FlinkOrcWriters.decimals(decimalType.precision(), decimalType.scale());
                }
            }
            throw new IllegalArgumentException(String.format("Invalid iceberg type %s corresponding to Flink logical type %s", iPrimitive, flinkPrimitive));
        }
    }
}

