/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.parquet.io;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.exception.HoodieException;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.format.BlockCipher;
import org.apache.parquet.format.DataPageHeader;
import org.apache.parquet.format.DataPageHeaderV2;
import org.apache.parquet.format.DictionaryPageHeader;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.CompressionConverter;
import org.apache.parquet.hadoop.util.HadoopCodecs;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HoodieParquetBinaryCopyBase
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieParquetBinaryCopyBase.class);
    public static final String ORIGINAL_CREATED_BY_KEY = "original.created.by";
    private final int pageBufferSize = 0x200000;
    private final byte[] pageBuffer = new byte[0x200000];
    private CompressionCodecName newCodecName = null;
    private Map<ColumnPath, Binary> maskColumns = new HashMap<ColumnPath, Binary>();
    private ParquetFileWriter writer;
    private int numBlocksRewritten = 0;
    protected long totalRecordsWritten = 0L;
    protected MessageType requiredSchema = null;
    protected Configuration conf;
    protected Boolean schemaEvolutionEnabled = null;

    public HoodieParquetBinaryCopyBase(Configuration conf) {
        this.conf = conf;
    }

    public void setSchemaEvolutionEnabled(boolean enabled) {
        this.schemaEvolutionEnabled = enabled;
    }

    protected void initFileWriter(Path outPutFile, CompressionCodecName newCodecName, MessageType schema) {
        try {
            Binary maskValue = Binary.fromString((String)outPutFile.getName());
            this.maskColumns.put(ColumnPath.fromDotString((String)HoodieRecord.FILENAME_METADATA_FIELD), maskValue);
            this.requiredSchema = schema;
            this.newCodecName = newCodecName;
            ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
            this.writer = new ParquetFileWriter((OutputFile)HadoopOutputFile.fromPath((Path)outPutFile, (Configuration)this.conf), schema, writerMode, 0x8000000L, 0x800000, 64, Integer.MAX_VALUE, true);
            this.writer.start();
            LOG.info("init writer ");
        }
        catch (Exception e) {
            LOG.error("failed to init parquet writer", (Throwable)e);
            throw new HoodieException((Throwable)e);
        }
    }

    @Override
    public void close() throws IOException {
        HashMap extraMetaData = this.finalizeMetadata();
        extraMetaData = extraMetaData == null ? new HashMap() : extraMetaData;
        extraMetaData.remove("parquet.avro.schema");
        extraMetaData.remove("org.apache.spark.sql.parquet.row.metadata");
        this.writer.end(extraMetaData);
    }

    protected abstract Map<String, String> finalizeMetadata();

    public void processBlocksFromReader(CompressionConverter.TransParquetFileReader reader, PageReadStore store, BlockMetaData block, String originalCreatedBy) throws IOException {
        if (store == null) {
            LOG.info("stores is empty");
            return;
        }
        this.totalRecordsWritten += store.getRowCount();
        ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, (GroupConverter)new DummyGroupConverter(), this.requiredSchema, originalCreatedBy);
        Map<ColumnPath, ColumnDescriptor> descriptorsMap = this.requiredSchema.getColumns().stream().collect(Collectors.toMap(x -> ColumnPath.get((String[])x.getPath()), x -> x));
        this.writer.startBlock(store.getRowCount());
        List columnsInOrder = block.getColumns();
        ArrayList<ColumnDescriptor> converted = new ArrayList<ColumnDescriptor>();
        for (int i = 0; i < columnsInOrder.size(); ++i) {
            CompressionCodecName newCodecName;
            ColumnChunkMetaData chunk = (ColumnChunkMetaData)columnsInOrder.get(i);
            ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
            if (this.schemaEvolutionEnabled == null) {
                throw new HoodieException("The variable 'schemaEvolutionEnabled' is supposed to be set in binaryCopy() before calling this method processBlocksFromReader");
            }
            if (descriptor == null && this.schemaEvolutionEnabled.booleanValue()) {
                String[] path = chunk.getPath().toArray();
                if (this.convertLegacy3LevelArray(path = Arrays.copyOf(path, path.length)) || this.convertLegacyMap(path)) {
                    ColumnPath newPath = ColumnPath.get((String[])path);
                    ColumnChunkMetaData newChunk = ColumnChunkMetaData.get((ColumnPath)newPath, (PrimitiveType)chunk.getPrimitiveType(), (CompressionCodecName)chunk.getCodec(), (EncodingStats)chunk.getEncodingStats(), (Set)chunk.getEncodings(), (Statistics)chunk.getStatistics(), (long)chunk.getFirstDataPageOffset(), (long)chunk.getDictionaryPageOffset(), (long)chunk.getValueCount(), (long)chunk.getTotalSize(), (long)chunk.getTotalUncompressedSize());
                    newChunk.setRowGroupOrdinal(chunk.getRowGroupOrdinal());
                    newChunk.setBloomFilterOffset(chunk.getBloomFilterOffset());
                    newChunk.setColumnIndexReference(chunk.getColumnIndexReference());
                    newChunk.setOffsetIndexReference(chunk.getOffsetIndexReference());
                    chunk = newChunk;
                    descriptor = descriptorsMap.get(chunk.getPath());
                    converted.add(descriptor);
                }
            }
            if (descriptor == null) continue;
            reader.setStreamPosition(chunk.getStartingPos());
            CompressionCodecName compressionCodecName = newCodecName = this.newCodecName == null ? chunk.getCodec() : this.newCodecName;
            if (this.maskColumns != null && this.maskColumns.containsKey(chunk.getPath())) {
                if (!chunk.getPath().toDotString().equals(HoodieRecord.FILENAME_METADATA_FIELD) && !this.schemaEvolutionEnabled.booleanValue()) {
                    throw new HoodieException("Column masking for '" + chunk.getPath().toDotString() + "' requires schema evolution to be enabled. Set 'hoodie.clustering.plan.strategy.binary.copy.schema.evolution.enable' to true.");
                }
                Binary maskValue = this.maskColumns.get(chunk.getPath());
                if (maskValue == null) continue;
                this.maskColumn(descriptor, chunk, crStore, this.writer, this.requiredSchema, newCodecName, maskValue);
                continue;
            }
            if (this.newCodecName != null && this.newCodecName != chunk.getCodec()) {
                this.writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
                this.processChunk(reader, chunk, newCodecName, false, originalCreatedBy);
                this.writer.endColumn();
                continue;
            }
            BloomFilter bloomFilter = reader.readBloomFilter(chunk);
            ColumnIndex columnIndex = reader.readColumnIndex(chunk);
            OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
            this.writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
        }
        ParquetMetadata meta = reader.getFooter();
        ColumnChunkMetaData columnChunkMetaData = (ColumnChunkMetaData)columnsInOrder.get(0);
        EncodingStats encodingStats = columnChunkMetaData.getEncodingStats();
        List missedColumns = this.missedColumns(this.requiredSchema, meta.getFileMetaData().getSchema()).stream().filter(c -> !converted.contains(c)).collect(Collectors.toList());
        if (!this.schemaEvolutionEnabled.booleanValue() && !missedColumns.isEmpty()) {
            String missingColumnsStr = missedColumns.stream().map(c -> String.join((CharSequence)".", c.getPath())).collect(Collectors.joining(", "));
            throw new HoodieException("Schema evolution is disabled but found missing columns in input file: " + missingColumnsStr + ". All input files must have the same schema when schema evolution is disabled.");
        }
        for (ColumnDescriptor descriptor : missedColumns) {
            this.addNullColumn(descriptor, store.getRowCount(), encodingStats, this.writer, this.requiredSchema, this.newCodecName);
        }
        this.writer.endBlock();
        ++this.numBlocksRewritten;
    }

    private void processChunk(CompressionConverter.TransParquetFileReader reader, ColumnChunkMetaData chunk, CompressionCodecName newCodecName, boolean encryptColumn, String originalCreatedBy) throws IOException {
        CompressionCodecFactory codecFactory = HadoopCodecs.newFactory((int)0);
        CompressionCodecFactory.BytesInputDecompressor decompressor = null;
        CompressionCodecFactory.BytesInputCompressor compressor = null;
        if (!newCodecName.equals((Object)chunk.getCodec())) {
            decompressor = codecFactory.getDecompressor(chunk.getCodec());
            compressor = codecFactory.getCompressor(newCodecName);
        }
        BlockCipher.Encryptor metaEncryptor = null;
        BlockCipher.Encryptor dataEncryptor = null;
        byte[] dictPageAAD = null;
        byte[] dataPageAAD = null;
        byte[] dictPageHeaderAAD = null;
        Object dataPageHeaderAAD = null;
        ColumnIndex columnIndex = reader.readColumnIndex(chunk);
        OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
        reader.setStreamPosition(chunk.getStartingPos());
        Object dictionaryPage = null;
        long readValues = 0L;
        Statistics statistics = null;
        ParquetMetadataConverter converter = new ParquetMetadataConverter();
        int pageOrdinal = 0;
        long totalChunkValues = chunk.getValueCount();
        block5: while (readValues < totalChunkValues) {
            PageHeader pageHeader = reader.readPageHeader();
            int compressedPageSize = pageHeader.getCompressed_page_size();
            switch (pageHeader.type) {
                case DICTIONARY_PAGE: {
                    if (dictionaryPage != null) {
                        throw new IOException("has more than one dictionary page in column chunk");
                    }
                    DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
                    byte[] pageLoad = this.processPageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size(), encryptColumn, dataEncryptor, dictPageAAD);
                    this.writer.writeDictionaryPage(new DictionaryPage(BytesInput.from((byte[])pageLoad), pageHeader.getUncompressed_page_size(), dictPageHeader.getNum_values(), converter.getEncoding(dictPageHeader.getEncoding())), metaEncryptor, dictPageHeaderAAD);
                    continue block5;
                }
                case DATA_PAGE: {
                    DataPageHeader headerV1 = pageHeader.data_page_header;
                    byte[] pageLoad = this.processPageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size(), encryptColumn, dataEncryptor, dataPageAAD);
                    statistics = this.convertStatistics(originalCreatedBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageOrdinal, converter);
                    readValues += (long)headerV1.getNum_values();
                    if (offsetIndex != null) {
                        long rowCount = 1L + offsetIndex.getLastRowIndex(pageOrdinal, totalChunkValues) - offsetIndex.getFirstRowIndex(pageOrdinal);
                        this.writer.writeDataPage(this.toIntWithCheck(headerV1.getNum_values()), pageHeader.getUncompressed_page_size(), BytesInput.from((byte[])pageLoad), statistics, (long)this.toIntWithCheck(rowCount), converter.getEncoding(headerV1.getRepetition_level_encoding()), converter.getEncoding(headerV1.getDefinition_level_encoding()), converter.getEncoding(headerV1.getEncoding()));
                    } else {
                        this.writer.writeDataPage(this.toIntWithCheck(headerV1.getNum_values()), pageHeader.getUncompressed_page_size(), BytesInput.from((byte[])pageLoad), statistics, converter.getEncoding(headerV1.getRepetition_level_encoding()), converter.getEncoding(headerV1.getDefinition_level_encoding()), converter.getEncoding(headerV1.getEncoding()));
                    }
                    ++pageOrdinal;
                    continue block5;
                }
                case DATA_PAGE_V2: {
                    DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
                    int rlLength = headerV2.getRepetition_levels_byte_length();
                    BytesInput rlLevels = this.readBlockAllocate(rlLength, reader);
                    int dlLength = headerV2.getDefinition_levels_byte_length();
                    BytesInput dlLevels = this.readBlockAllocate(dlLength, reader);
                    int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
                    int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
                    byte[] pageLoad = this.processPageLoad(reader, headerV2.is_compressed, compressor, decompressor, payLoadLength, rawDataLength, encryptColumn, dataEncryptor, dataPageAAD);
                    statistics = this.convertStatistics(originalCreatedBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageOrdinal, converter);
                    readValues += (long)headerV2.getNum_values();
                    this.writer.writeDataPageV2(headerV2.getNum_rows(), headerV2.getNum_nulls(), headerV2.getNum_values(), rlLevels, dlLevels, converter.getEncoding(headerV2.getEncoding()), BytesInput.from((byte[])pageLoad), rawDataLength, statistics);
                    ++pageOrdinal;
                    continue block5;
                }
            }
            LOG.debug("skipping page of type {} of size {}", (Object)pageHeader.getType(), (Object)compressedPageSize);
        }
    }

    private Statistics convertStatistics(String createdBy, PrimitiveType type, org.apache.parquet.format.Statistics pageStatistics, ColumnIndex columnIndex, int pageIndex, ParquetMetadataConverter converter) throws IOException {
        if (columnIndex != null) {
            if (columnIndex.getNullPages() == null) {
                throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " + type.getName());
            }
            if (pageIndex > columnIndex.getNullPages().size()) {
                throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " + columnIndex.getNullPages().size());
            }
            Statistics.Builder statsBuilder = Statistics.getBuilderForReading((PrimitiveType)type);
            statsBuilder.withNumNulls(((Long)columnIndex.getNullCounts().get(pageIndex)).longValue());
            if (!((Boolean)columnIndex.getNullPages().get(pageIndex)).booleanValue()) {
                statsBuilder.withMin((byte[])((ByteBuffer)columnIndex.getMinValues().get(pageIndex)).array().clone());
                statsBuilder.withMax((byte[])((ByteBuffer)columnIndex.getMaxValues().get(pageIndex)).array().clone());
            }
            return statsBuilder.build();
        }
        if (pageStatistics != null) {
            return converter.fromParquetStatistics(createdBy, pageStatistics, type);
        }
        return null;
    }

    private byte[] processPageLoad(CompressionConverter.TransParquetFileReader reader, boolean isCompressed, CompressionCodecFactory.BytesInputCompressor compressor, CompressionCodecFactory.BytesInputDecompressor decompressor, int payloadLength, int rawDataLength, boolean encrypt, BlockCipher.Encryptor dataEncryptor, byte[] add) throws IOException {
        BytesInput data = this.readBlock(payloadLength, reader);
        if (compressor != null) {
            if (isCompressed) {
                data = decompressor.decompress(data, rawDataLength);
            }
            data = compressor.compress(data);
        }
        if (!encrypt) {
            return data.toByteArray();
        }
        return dataEncryptor.encrypt(data.toByteArray(), add);
    }

    public BytesInput readBlock(int length, CompressionConverter.TransParquetFileReader reader) throws IOException {
        byte[] data = length > 0x200000 ? new byte[length] : this.pageBuffer;
        reader.blockRead(data, 0, length);
        return BytesInput.from((byte[])data, (int)0, (int)length);
    }

    public BytesInput readBlockAllocate(int length, CompressionConverter.TransParquetFileReader reader) throws IOException {
        byte[] data = new byte[length];
        reader.blockRead(data, 0, length);
        return BytesInput.from((byte[])data, (int)0, (int)length);
    }

    private int toIntWithCheck(long size) {
        if ((long)((int)size) != size) {
            throw new ParquetEncodingException("size is bigger than 2147483647 bytes: " + size);
        }
        return (int)size;
    }

    private void maskColumn(ColumnDescriptor descriptor, ColumnChunkMetaData chunk, ColumnReadStoreImpl crStore, ParquetFileWriter writer, MessageType schema, CompressionCodecName newCodecName, Binary maskValue) throws IOException {
        long totalChunkValues = chunk.getValueCount();
        ColumnReader cReader = crStore.getColumnReader(descriptor);
        ParquetProperties.WriterVersion writerVersion = chunk.getEncodingStats().usesV2Pages() ? ParquetProperties.WriterVersion.PARQUET_2_0 : ParquetProperties.WriterVersion.PARQUET_1_0;
        ParquetProperties props = ParquetProperties.builder().withWriterVersion(writerVersion).build();
        CodecFactory codecFactory = new CodecFactory(new Configuration(), props.getPageSizeThreshold());
        CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(newCodecName);
        MessageType newSchema = this.newSchema(schema, descriptor);
        ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(compressor, newSchema, props.getAllocator(), props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled(), null, this.numBlocksRewritten);
        ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, (PageWriteStore)cPageStore);
        ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
        int i = 0;
        while ((long)i < totalChunkValues) {
            int rlvl = cReader.getCurrentRepetitionLevel();
            int dlvl = cReader.getCurrentDefinitionLevel();
            cWriter.write(maskValue, rlvl, dlvl);
            cStore.endRecord();
            ++i;
        }
        cStore.flush();
        cPageStore.flushToFileWriter(writer);
        cStore.close();
        cWriter.close();
    }

    private void addNullColumn(ColumnDescriptor descriptor, long totalChunkValues, EncodingStats encodingStats, ParquetFileWriter writer, MessageType schema, CompressionCodecName newCodecName) throws IOException {
        ParquetProperties.WriterVersion writerVersion = encodingStats.usesV2Pages() ? ParquetProperties.WriterVersion.PARQUET_2_0 : ParquetProperties.WriterVersion.PARQUET_1_0;
        ParquetProperties props = ParquetProperties.builder().withWriterVersion(writerVersion).build();
        CodecFactory codecFactory = new CodecFactory(new Configuration(), props.getPageSizeThreshold());
        CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(newCodecName);
        MessageType newSchema = this.newSchema(schema, descriptor);
        ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(compressor, newSchema, props.getAllocator(), props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled(), null, this.numBlocksRewritten);
        ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, (PageWriteStore)cPageStore);
        ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
        int dMax = descriptor.getMaxDefinitionLevel();
        int i = 0;
        while ((long)i < totalChunkValues) {
            int rlvl = 0;
            int dlvl = 0;
            if (dlvl == dMax) {
                if (dlvl == 0) {
                    throw new IOException("definition level is detected to be 0 for column " + Arrays.stream(descriptor.getPath()).collect(Collectors.joining(".")) + " to be nullified");
                }
                if (rlvl == 0) {
                    cWriter.writeNull(rlvl, dlvl - 1);
                }
            } else {
                cWriter.writeNull(rlvl, dlvl);
            }
            cStore.endRecord();
            ++i;
        }
        cStore.flush();
        cPageStore.flushToFileWriter(writer);
        cStore.close();
        cWriter.close();
    }

    private List<ColumnDescriptor> missedColumns(MessageType requiredSchema, MessageType fileSchema) {
        return requiredSchema.getColumns().stream().filter(col -> !fileSchema.containsPath(col.getPath())).collect(Collectors.toList());
    }

    private MessageType newSchema(MessageType schema, ColumnDescriptor descriptor) {
        String[] path = descriptor.getPath();
        Type type = schema.getType(path);
        if (path.length == 1) {
            return new MessageType(schema.getName(), new Type[]{type});
        }
        for (Type field : schema.getFields()) {
            Type newType;
            if (field.isPrimitive() || !path[0].equals(field.getName()) || (newType = this.extractField(field.asGroupType(), type)) == null) continue;
            return new MessageType(schema.getName(), new Type[]{newType});
        }
        throw new RuntimeException("No field is found");
    }

    private Type extractField(GroupType candidate, Type targetField) {
        if (targetField.equals((Object)candidate)) {
            return targetField;
        }
        for (Type field : candidate.asGroupType().getFields()) {
            if (field.isPrimitive()) {
                if (!field.equals((Object)targetField)) continue;
                return new GroupType(candidate.getRepetition(), candidate.getName(), new Type[]{targetField});
            }
            Type tempField = this.extractField(field.asGroupType(), targetField);
            if (tempField == null) continue;
            return new GroupType(candidate.getRepetition(), candidate.getName(), new Type[]{tempField});
        }
        return null;
    }

    @VisibleForTesting
    public boolean convertLegacy3LevelArray(String[] path) {
        int i;
        boolean changed = false;
        for (i = 0; i < path.length - 2; ++i) {
            if (!"bag".equals(path[i + 1]) || !"array_element".equals(path[i + 2])) continue;
            path[i + 1] = "list";
            path[i + 2] = "element";
            changed = true;
            break;
        }
        if (!changed) {
            for (i = 0; i < path.length; ++i) {
                try {
                    String[] subPath = Arrays.copyOf(path, i + 1);
                    Type type = this.requiredSchema.getType(subPath);
                    if (type.getOriginalType() != OriginalType.LIST || i + 1 >= path.length || !"bag".equals(path[i + 1])) continue;
                    path[i + 1] = "list";
                    path[i + 2] = "element";
                    changed = true;
                    continue;
                }
                catch (InvalidRecordException e) {
                    LOG.debug("field not found due to schema evolution, nothing need to do");
                }
            }
        }
        return changed;
    }

    public boolean convertLegacyMap(String[] path) {
        boolean changed = false;
        for (int i = 0; i < path.length; ++i) {
            try {
                String[] subPath = Arrays.copyOf(path, i + 1);
                Type type = this.requiredSchema.getType(subPath);
                if (type.getOriginalType() != OriginalType.MAP || !"map".equals(path[i + 1])) continue;
                path[i + 1] = "key_value";
                changed = true;
                continue;
            }
            catch (Throwable e) {
                LOG.debug("field not found due to schema evolution, nothing need to do");
            }
        }
        return changed;
    }

    private static final class DummyConverter
    extends PrimitiveConverter {
        private DummyConverter() {
        }

        public GroupConverter asGroupConverter() {
            return new DummyGroupConverter();
        }
    }

    private static final class DummyGroupConverter
    extends GroupConverter {
        private DummyGroupConverter() {
        }

        public void start() {
        }

        public void end() {
        }

        public Converter getConverter(int fieldIndex) {
            return new DummyConverter();
        }
    }
}

