/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.segment.local.segment.index.loader;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.BigDecimalColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.BytesColumnPredIndexStatsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.DoubleColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.index.loader.BaseIndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.segment.index.readers.BaseImmutableDictionary;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForwardIndexHandler
extends BaseIndexHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(ForwardIndexHandler.class);
    private static final List<ColumnIndexType> DICTIONARY_BASED_INDEXES_TO_REWRITE = Arrays.asList(ColumnIndexType.RANGE_INDEX, ColumnIndexType.FST_INDEX, ColumnIndexType.INVERTED_INDEX);
    private final Schema _schema;

    public ForwardIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig, Schema schema) {
        super(segmentMetadata, indexLoadingConfig);
        this._schema = schema;
    }

    @Override
    public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) throws Exception {
        Map<String, Operation> columnOperationMap = this.computeOperation(segmentReader);
        return !columnOperationMap.isEmpty();
    }

    @Override
    public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider) throws Exception {
        Map<String, Operation> columnOperationMap = this.computeOperation((SegmentDirectory.Reader)segmentWriter);
        if (columnOperationMap.isEmpty()) {
            return;
        }
        block9: for (Map.Entry<String, Operation> entry : columnOperationMap.entrySet()) {
            String column = entry.getKey();
            Operation operation = entry.getValue();
            switch (operation) {
                case DISABLE_FORWARD_INDEX_FOR_DICT_COLUMN: {
                    this._tmpForwardIndexColumns.add(column);
                    continue block9;
                }
                case DISABLE_FORWARD_INDEX_FOR_RAW_COLUMN: {
                    this.createDictBasedForwardIndex(column, segmentWriter, indexCreatorProvider);
                    if (!segmentWriter.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX)) {
                        throw new IOException(String.format("Temporary forward index was not created for column: %s", column));
                    }
                    this._tmpForwardIndexColumns.add(column);
                    continue block9;
                }
                case ENABLE_FORWARD_INDEX_FOR_DICT_COLUMN: {
                    this.createForwardIndexIfNeeded(segmentWriter, this._segmentMetadata.getColumnMetadataFor(column), indexCreatorProvider, false);
                    if (segmentWriter.hasIndexFor(column, ColumnIndexType.DICTIONARY)) continue block9;
                    throw new IOException(String.format("Dictionary should still exist after rebuilding forward index for dictionary column: %s", column));
                }
                case ENABLE_FORWARD_INDEX_FOR_RAW_COLUMN: {
                    this.createForwardIndexIfNeeded(segmentWriter, this._segmentMetadata.getColumnMetadataFor(column), indexCreatorProvider, false);
                    if (!segmentWriter.hasIndexFor(column, ColumnIndexType.DICTIONARY)) continue block9;
                    throw new IOException(String.format("Dictionary should not exist after rebuilding forward index for raw column: %s", column));
                }
                case ENABLE_DICTIONARY: {
                    this.createDictBasedForwardIndex(column, segmentWriter, indexCreatorProvider);
                    continue block9;
                }
                case DISABLE_DICTIONARY: {
                    this.disableDictionaryAndCreateRawForwardIndex(column, segmentWriter, indexCreatorProvider);
                    continue block9;
                }
                case CHANGE_RAW_INDEX_COMPRESSION_TYPE: {
                    this.rewriteRawForwardIndexForCompressionChange(column, segmentWriter, indexCreatorProvider);
                    continue block9;
                }
            }
            throw new IllegalStateException("Unsupported operation for column " + column);
        }
    }

    @VisibleForTesting
    Map<String, Operation> computeOperation(SegmentDirectory.Reader segmentReader) throws Exception {
        HashMap<String, Operation> columnOperationMap = new HashMap<String, Operation>();
        if (this._segmentMetadata.getVersion().compareTo((Enum)SegmentVersion.v3) < 0) {
            return columnOperationMap;
        }
        Set existingAllColumns = this._segmentMetadata.getAllColumns();
        Set existingDictColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.DICTIONARY);
        HashSet<String> existingNoDictColumns = new HashSet<String>();
        for (String column : existingAllColumns) {
            if (existingDictColumns.contains(column)) continue;
            existingNoDictColumns.add(column);
        }
        Set existingForwardIndexColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.FORWARD_INDEX);
        HashSet<String> existingForwardIndexDisabledColumns = new HashSet<String>();
        for (String column : existingAllColumns) {
            if (existingForwardIndexColumns.contains(column)) continue;
            existingForwardIndexDisabledColumns.add(column);
        }
        Set<String> newNoDictColumns = this._indexLoadingConfig.getNoDictionaryColumns();
        Set<String> newForwardIndexDisabledColumns = this._indexLoadingConfig.getForwardIndexDisabledColumns();
        for (String column : existingAllColumns) {
            ColumnMetadata columnMetadata;
            if (existingForwardIndexColumns.contains(column) && newForwardIndexDisabledColumns.contains(column)) {
                Preconditions.checkState((!newNoDictColumns.contains(column) ? 1 : 0) != 0, (Object)String.format("Must enable dictionary to disable the forward index for column: %s", column));
                Preconditions.checkState((boolean)this._indexLoadingConfig.getInvertedIndexColumns().contains(column), (Object)String.format("Must enable inverted index to disable the forward index for column: %s", column));
                columnMetadata = this._segmentMetadata.getColumnMetadataFor(column);
                if (columnMetadata.isSorted()) {
                    LOGGER.warn("Trying to disable the forward index for a sorted column {}, ignoring", (Object)column);
                    continue;
                }
                if (existingDictColumns.contains(column)) {
                    columnOperationMap.put(column, Operation.DISABLE_FORWARD_INDEX_FOR_DICT_COLUMN);
                    continue;
                }
                columnOperationMap.put(column, Operation.DISABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
                continue;
            }
            if (existingForwardIndexDisabledColumns.contains(column) && !newForwardIndexDisabledColumns.contains(column)) {
                columnMetadata = this._segmentMetadata.getColumnMetadataFor(column);
                if (columnMetadata != null && columnMetadata.isSorted()) {
                    LOGGER.warn("Trying to enable the forward index for a sorted column {}, ignoring", (Object)column);
                    continue;
                }
                if (newNoDictColumns.contains(column)) {
                    Preconditions.checkState((!this._indexLoadingConfig.getInvertedIndexColumns().contains(column) ? 1 : 0) != 0, (Object)String.format("Must disable inverted index to enable the forward index as noDictionary for column: %s", column));
                    columnOperationMap.put(column, Operation.ENABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
                    continue;
                }
                columnOperationMap.put(column, Operation.ENABLE_FORWARD_INDEX_FOR_DICT_COLUMN);
                continue;
            }
            if (existingForwardIndexDisabledColumns.contains(column) && newForwardIndexDisabledColumns.contains(column)) {
                Preconditions.checkState((existingDictColumns.contains(column) && !newNoDictColumns.contains(column) ? 1 : 0) != 0, (Object)String.format("Not allowed to disable the dictionary for a column: %s without forward index", column));
                continue;
            }
            if (existingNoDictColumns.contains(column) && !newNoDictColumns.contains(column)) {
                if (this._schema == null || this._indexLoadingConfig.getTableConfig() == null) {
                    LOGGER.warn("Cannot enable dictionary for column={} as schema or tableConfig is null.", (Object)column);
                    continue;
                }
                ColumnMetadata existingColMetadata = this._segmentMetadata.getColumnMetadataFor(column);
                Preconditions.checkState((!existingColMetadata.isSorted() ? 1 : 0) != 0, (Object)("Raw column=" + column + " cannot be sorted."));
                columnOperationMap.put(column, Operation.ENABLE_DICTIONARY);
                continue;
            }
            if (existingDictColumns.contains(column) && newNoDictColumns.contains(column)) {
                if (!this.shouldDisableDictionary(column, this._segmentMetadata.getColumnMetadataFor(column))) continue;
                columnOperationMap.put(column, Operation.DISABLE_DICTIONARY);
                continue;
            }
            if (!existingNoDictColumns.contains(column) || !newNoDictColumns.contains(column) || !this.shouldChangeCompressionType(column, segmentReader)) continue;
            columnOperationMap.put(column, Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
        }
        return columnOperationMap;
    }

    private boolean shouldDisableDictionary(String column, ColumnMetadata existingColumnMetadata) {
        if (this._schema == null || this._indexLoadingConfig.getTableConfig() == null) {
            LOGGER.warn("Cannot disable dictionary for column={} as schema or tableConfig is null.", (Object)column);
            return false;
        }
        if (existingColumnMetadata.isSorted()) {
            LOGGER.warn("Cannot disable dictionary for column={} as it is sorted.", (Object)column);
            return false;
        }
        if (this._indexLoadingConfig.getInvertedIndexColumns().contains(column) || this._indexLoadingConfig.getFSTIndexColumns().contains(column)) {
            LOGGER.warn("Cannot disable dictionary as column={} has FST index or inverted index or both.", (Object)column);
            return false;
        }
        return true;
    }

    private boolean shouldChangeCompressionType(String column, SegmentDirectory.Reader segmentReader) throws Exception {
        ColumnMetadata existingColMetadata = this._segmentMetadata.getColumnMetadataFor(column);
        try (ForwardIndexReader<?> fwdIndexReader = LoaderUtils.getForwardIndexReader(segmentReader, existingColMetadata);){
            ChunkCompressionType existingCompressionType = fwdIndexReader.getCompressionType();
            Preconditions.checkState((existingCompressionType != null ? 1 : 0) != 0, (Object)("Existing compressionType cannot be null for raw forward index column=" + column));
            ChunkCompressionType newCompressionType = null;
            Map<String, ChunkCompressionType> newCompressionConfigs = this._indexLoadingConfig.getCompressionConfigs();
            if (newCompressionConfigs.containsKey(column)) {
                newCompressionType = newCompressionConfigs.get(column);
            }
            boolean bl = newCompressionType != null && existingCompressionType != newCompressionType;
            return bl;
        }
    }

    private void rewriteRawForwardIndexForCompressionChange(String column, SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider) throws Exception {
        ColumnMetadata existingColMetadata = this._segmentMetadata.getColumnMetadataFor(column);
        boolean isSingleValue = existingColMetadata.isSingleValue();
        File indexDir = this._segmentMetadata.getIndexDir();
        String segmentName = this._segmentMetadata.getName();
        File inProgress = new File(indexDir, column + ".fwd.inprogress");
        String fileExtension = isSingleValue ? ".sv.raw.fwd" : ".mv.raw.fwd";
        File fwdIndexFile = new File(indexDir, column + fileExtension);
        if (!inProgress.exists()) {
            FileUtils.touch((File)inProgress);
        } else {
            FileUtils.deleteQuietly((File)fwdIndexFile);
        }
        LOGGER.info("Creating new forward index for segment={} and column={}", (Object)segmentName, (Object)column);
        Map<String, ChunkCompressionType> compressionConfigs = this._indexLoadingConfig.getCompressionConfigs();
        Preconditions.checkState((boolean)compressionConfigs.containsKey(column));
        ChunkCompressionType newCompressionType = compressionConfigs.get(column);
        if (isSingleValue) {
            this.rewriteRawSVForwardIndexForCompressionChange(column, existingColMetadata, indexDir, segmentWriter, indexCreatorProvider, newCompressionType);
        } else {
            this.rewriteRawMVForwardIndexForCompressionChange(column, existingColMetadata, indexDir, segmentWriter, indexCreatorProvider, newCompressionType);
        }
        segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
        LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, ColumnIndexType.FORWARD_INDEX);
        FileUtils.deleteQuietly((File)inProgress);
        LOGGER.info("Created forward index for segment: {}, column: {}", (Object)segmentName, (Object)column);
    }

    private void rewriteRawMVForwardIndexForCompressionChange(String column, ColumnMetadata existingColMetadata, File indexDir, SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider, ChunkCompressionType newCompressionType) throws Exception {
        try (ForwardIndexReader<?> reader = LoaderUtils.getForwardIndexReader((SegmentDirectory.Reader)segmentWriter, existingColMetadata);){
            int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
            int maxNumberOfMVEntries = existingColMetadata.getMaxNumberOfMultiValues();
            int maxRowLengthInBytes = MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry, maxNumberOfMVEntries);
            IndexCreationContext.Forward context = IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata).withLengthOfLongestEntry(lengthOfLongestEntry).withMaxRowLengthInBytes(maxRowLengthInBytes).build().forForwardIndex(newCompressionType, this._indexLoadingConfig.getColumnProperties());
            try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context);){
                if (!reader.getStoredType().equals((Object)creator.getValueType())) {
                    String failureMsg = "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType().toString() + " to " + creator.getValueType().toString();
                    throw new UnsupportedOperationException(failureMsg);
                }
                int numDocs = existingColMetadata.getTotalDocs();
                this.forwardIndexRewriteHelper(column, existingColMetadata, reader, creator, numDocs, null, null);
            }
        }
    }

    private void rewriteRawSVForwardIndexForCompressionChange(String column, ColumnMetadata existingColMetadata, File indexDir, SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider, ChunkCompressionType newCompressionType) throws Exception {
        try (ForwardIndexReader<?> reader = LoaderUtils.getForwardIndexReader((SegmentDirectory.Reader)segmentWriter, existingColMetadata);){
            int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
            IndexCreationContext.Forward context = IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata).withLengthOfLongestEntry(lengthOfLongestEntry).build().forForwardIndex(newCompressionType, this._indexLoadingConfig.getColumnProperties());
            try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context);){
                if (!reader.getStoredType().equals((Object)creator.getValueType())) {
                    String failureMsg = "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType().toString() + " to " + creator.getValueType().toString();
                    throw new UnsupportedOperationException(failureMsg);
                }
                int numDocs = existingColMetadata.getTotalDocs();
                this.forwardIndexRewriteHelper(column, existingColMetadata, reader, creator, numDocs, null, null);
            }
        }
    }

    private void forwardIndexRewriteHelper(String column, ColumnMetadata existingColumnMetadata, ForwardIndexReader reader, ForwardIndexCreator creator, int numDocs, @Nullable SegmentDictionaryCreator dictionaryCreator, @Nullable Dictionary dictionaryReader) {
        if (dictionaryReader == null && dictionaryCreator == null) {
            this.forwardIndexReadRawWriteRawHelper(column, existingColumnMetadata, reader, creator, numDocs);
        } else if (dictionaryReader != null && dictionaryCreator == null) {
            this.forwardIndexReadDictWriteRawHelper(column, existingColumnMetadata, reader, creator, numDocs, dictionaryReader);
        } else if (dictionaryReader == null && dictionaryCreator != null) {
            this.forwardIndexReadRawWriteDictHelper(column, existingColumnMetadata, reader, creator, numDocs, dictionaryCreator);
        } else {
            Preconditions.checkState((boolean)false, (Object)("Invalid dict-based read/write for column=" + column));
        }
    }

    private void forwardIndexReadRawWriteRawHelper(String column, ColumnMetadata existingColumnMetadata, ForwardIndexReader reader, ForwardIndexCreator creator, int numDocs) {
        ForwardIndexReaderContext readerContext = reader.createContext();
        boolean isSVColumn = reader.isSingleValue();
        switch (reader.getStoredType()) {
            case INT: {
                for (int i = 0; i < numDocs; ++i) {
                    if (isSVColumn) {
                        int val = reader.getInt(i, readerContext);
                        creator.putInt(val);
                        continue;
                    }
                    int[] ints = reader.getIntMV(i, readerContext);
                    creator.putIntMV(ints);
                }
                break;
            }
            case LONG: {
                for (int i = 0; i < numDocs; ++i) {
                    if (isSVColumn) {
                        long val = reader.getLong(i, readerContext);
                        creator.putLong(val);
                        continue;
                    }
                    long[] longs = reader.getLongMV(i, readerContext);
                    creator.putLongMV(longs);
                }
                break;
            }
            case FLOAT: {
                for (int i = 0; i < numDocs; ++i) {
                    if (isSVColumn) {
                        float val = reader.getFloat(i, readerContext);
                        creator.putFloat(val);
                        continue;
                    }
                    float[] floats = reader.getFloatMV(i, readerContext);
                    creator.putFloatMV(floats);
                }
                break;
            }
            case DOUBLE: {
                for (int i = 0; i < numDocs; ++i) {
                    if (isSVColumn) {
                        double val = reader.getDouble(i, readerContext);
                        creator.putDouble(val);
                        continue;
                    }
                    double[] doubles = reader.getDoubleMV(i, readerContext);
                    creator.putDoubleMV(doubles);
                }
                break;
            }
            case STRING: {
                for (int i = 0; i < numDocs; ++i) {
                    if (isSVColumn) {
                        String val = reader.getString(i, readerContext);
                        creator.putString(val);
                        continue;
                    }
                    String[] strings = reader.getStringMV(i, readerContext);
                    creator.putStringMV(strings);
                }
                break;
            }
            case BYTES: {
                for (int i = 0; i < numDocs; ++i) {
                    if (isSVColumn) {
                        byte[] val = reader.getBytes(i, readerContext);
                        creator.putBytes(val);
                        continue;
                    }
                    byte[][] bytesArray = reader.getBytesMV(i, readerContext);
                    creator.putBytesMV(bytesArray);
                }
                break;
            }
            case BIG_DECIMAL: {
                Preconditions.checkState((boolean)isSVColumn, (Object)"BigDecimal is not supported for MV columns");
                for (int i = 0; i < numDocs; ++i) {
                    BigDecimal val = reader.getBigDecimal(i, readerContext);
                    creator.putBigDecimal(val);
                }
                break;
            }
            default: {
                throw new IllegalStateException("Unsupported storedType=" + reader.getStoredType() + " for column=" + column);
            }
        }
    }

    private void forwardIndexReadDictWriteRawHelper(String column, ColumnMetadata existingColumnMetadata, ForwardIndexReader reader, ForwardIndexCreator creator, int numDocs, Dictionary dictionaryReader) {
        ForwardIndexReaderContext readerContext = reader.createContext();
        boolean isSVColumn = reader.isSingleValue();
        FieldSpec.DataType storedType = dictionaryReader.getValueType().getStoredType();
        switch (storedType) {
            case INT: {
                for (int i = 0; i < numDocs; ++i) {
                    if (isSVColumn) {
                        int dictId = reader.getDictId(i, readerContext);
                        int val = dictionaryReader.getIntValue(dictId);
                        creator.putInt(val);
                        continue;
                    }
                    int[] dictIds = reader.getDictIdMV(i, readerContext);
                    int[] ints = new int[dictIds.length];
                    dictionaryReader.readIntValues(dictIds, dictIds.length, ints);
                    creator.putIntMV(ints);
                }
                break;
            }
            case LONG: {
                for (int i = 0; i < numDocs; ++i) {
                    if (isSVColumn) {
                        int dictId = reader.getDictId(i, readerContext);
                        long val = dictionaryReader.getLongValue(dictId);
                        creator.putLong(val);
                        continue;
                    }
                    int[] dictIds = reader.getDictIdMV(i, readerContext);
                    long[] longs = new long[dictIds.length];
                    dictionaryReader.readLongValues(dictIds, dictIds.length, longs);
                    creator.putLongMV(longs);
                }
                break;
            }
            case FLOAT: {
                for (int i = 0; i < numDocs; ++i) {
                    if (isSVColumn) {
                        int dictId = reader.getDictId(i, readerContext);
                        float val = dictionaryReader.getFloatValue(dictId);
                        creator.putFloat(val);
                        continue;
                    }
                    int[] dictIds = reader.getDictIdMV(i, readerContext);
                    float[] floats = new float[dictIds.length];
                    dictionaryReader.readFloatValues(dictIds, dictIds.length, floats);
                    creator.putFloatMV(floats);
                }
                break;
            }
            case DOUBLE: {
                for (int i = 0; i < numDocs; ++i) {
                    if (isSVColumn) {
                        int dictId = reader.getDictId(i, readerContext);
                        double val = dictionaryReader.getDoubleValue(dictId);
                        creator.putDouble(val);
                        continue;
                    }
                    int[] dictIds = reader.getDictIdMV(i, readerContext);
                    double[] doubles = new double[dictIds.length];
                    dictionaryReader.readDoubleValues(dictIds, dictIds.length, doubles);
                    creator.putDoubleMV(doubles);
                }
                break;
            }
            case BYTES: {
                for (int i = 0; i < numDocs; ++i) {
                    if (isSVColumn) {
                        int dictId = reader.getDictId(i, readerContext);
                        byte[] val = dictionaryReader.getBytesValue(dictId);
                        creator.putBytes(val);
                        continue;
                    }
                    int[] dictIds = reader.getDictIdMV(i, readerContext);
                    byte[][] bytes = new byte[dictIds.length][];
                    dictionaryReader.readBytesValues(dictIds, dictIds.length, (byte[][])bytes);
                    creator.putBytesMV((byte[][])bytes);
                }
                break;
            }
            case STRING: {
                for (int i = 0; i < numDocs; ++i) {
                    if (isSVColumn) {
                        int dictId = reader.getDictId(i, readerContext);
                        String val = dictionaryReader.getStringValue(dictId);
                        creator.putString(val);
                        continue;
                    }
                    int[] dictIds = reader.getDictIdMV(i, readerContext);
                    String[] strings = new String[dictIds.length];
                    dictionaryReader.readStringValues(dictIds, dictIds.length, strings);
                    creator.putStringMV(strings);
                }
                break;
            }
            case BIG_DECIMAL: {
                Preconditions.checkState((boolean)isSVColumn, (Object)"BigDecimal is not supported for MV columns");
                for (int i = 0; i < numDocs; ++i) {
                    int dictId = reader.getDictId(i, readerContext);
                    BigDecimal val = dictionaryReader.getBigDecimalValue(dictId);
                    creator.putBigDecimal(val);
                }
                break;
            }
            default: {
                throw new IllegalStateException("Unsupported storedType=" + storedType + " for column=" + column);
            }
        }
    }

    private void forwardIndexReadRawWriteDictHelper(String column, ColumnMetadata existingColumnMetadata, ForwardIndexReader reader, ForwardIndexCreator creator, int numDocs, SegmentDictionaryCreator dictionaryCreator) {
        boolean isSVColumn = reader.isSingleValue();
        int maxNumValuesPerEntry = existingColumnMetadata.getMaxNumberOfMultiValues();
        PinotSegmentColumnReader columnReader = new PinotSegmentColumnReader(reader, null, null, maxNumValuesPerEntry);
        for (int i = 0; i < numDocs; ++i) {
            Object obj = columnReader.getValue(i);
            if (isSVColumn) {
                int dictId = dictionaryCreator.indexOfSV(obj);
                creator.putDictId(dictId);
                continue;
            }
            int[] dictIds = dictionaryCreator.indexOfMV(obj);
            creator.putDictIdMV(dictIds);
        }
    }

    private void createDictBasedForwardIndex(String column, SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider) throws Exception {
        ColumnMetadata existingColMetadata = this._segmentMetadata.getColumnMetadataFor(column);
        boolean isSingleValue = existingColMetadata.isSingleValue();
        File indexDir = this._segmentMetadata.getIndexDir();
        String segmentName = this._segmentMetadata.getName();
        File inProgress = new File(indexDir, column + ".dict.inprogress");
        File dictionaryFile = new File(indexDir, column + ".dict");
        String fwdIndexFileExtension = isSingleValue ? ".sv.unsorted.fwd" : ".mv.fwd";
        File fwdIndexFile = new File(indexDir, column + fwdIndexFileExtension);
        if (!inProgress.exists()) {
            FileUtils.touch((File)inProgress);
        } else {
            FileUtils.deleteQuietly((File)fwdIndexFile);
            FileUtils.deleteQuietly((File)dictionaryFile);
        }
        LOGGER.info("Creating a new dictionary for segment={} and column={}", (Object)segmentName, (Object)column);
        AbstractColumnStatisticsCollector statsCollector = this.getStatsCollector(column, existingColMetadata.getDataType().getStoredType());
        SegmentDictionaryCreator dictionaryCreator = this.buildDictionary(column, existingColMetadata, segmentWriter, statsCollector);
        LoaderUtils.writeIndexToV3Format(segmentWriter, column, dictionaryFile, ColumnIndexType.DICTIONARY);
        LOGGER.info("Built dictionary. Rewriting dictionary enabled forward index for segment={} and column={}", (Object)segmentName, (Object)column);
        this.writeDictEnabledForwardIndex(column, existingColMetadata, segmentWriter, indexDir, indexCreatorProvider, dictionaryCreator);
        segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
        LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, ColumnIndexType.FORWARD_INDEX);
        LOGGER.info("Created forwardIndex. Updating metadata properties for segment={} and column={}", (Object)segmentName, (Object)column);
        HashMap<String, String> metadataProperties = new HashMap<String, String>();
        metadataProperties.put(V1Constants.MetadataKeys.Column.getKeyFor((String)column, (String)"hasDictionary"), String.valueOf(true));
        metadataProperties.put(V1Constants.MetadataKeys.Column.getKeyFor((String)column, (String)"lengthOfEachEntry"), String.valueOf(dictionaryCreator.getNumBytesPerEntry()));
        metadataProperties.put(V1Constants.MetadataKeys.Column.getKeyFor((String)column, (String)"cardinality"), String.valueOf(statsCollector.getCardinality()));
        SegmentMetadataUtils.updateMetadataProperties((SegmentMetadata)this._segmentMetadata, metadataProperties);
        ForwardIndexHandler.removeDictRelatedIndexes(column, segmentWriter);
        FileUtils.deleteQuietly((File)inProgress);
        LOGGER.info("Created dictionary based forward index for segment: {}, column: {}", (Object)segmentName, (Object)column);
    }

    private SegmentDictionaryCreator buildDictionary(String column, ColumnMetadata existingColMetadata, SegmentDirectory.Writer segmentWriter, AbstractColumnStatisticsCollector statsCollector) throws Exception {
        int numDocs = existingColMetadata.getTotalDocs();
        try (ForwardIndexReader<?> reader = LoaderUtils.getForwardIndexReader((SegmentDirectory.Reader)segmentWriter, existingColMetadata);){
            PinotSegmentColumnReader columnReader = new PinotSegmentColumnReader(reader, null, null, existingColMetadata.getMaxNumberOfMultiValues());
            for (int i = 0; i < numDocs; ++i) {
                Object obj = columnReader.getValue(i);
                statsCollector.collect(obj);
            }
            statsCollector.seal();
            boolean useVarLength = SegmentIndexCreationDriverImpl.shouldUseVarLengthDictionary(column, this._indexLoadingConfig.getVarLengthDictionaryColumns(), reader.getStoredType(), statsCollector);
            SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(existingColMetadata.getFieldSpec(), this._segmentMetadata.getIndexDir(), useVarLength);
            dictionaryCreator.build(statsCollector.getUniqueValuesSet());
            SegmentDictionaryCreator segmentDictionaryCreator = dictionaryCreator;
            return segmentDictionaryCreator;
        }
    }

    private void writeDictEnabledForwardIndex(String column, ColumnMetadata existingColMetadata, SegmentDirectory.Writer segmentWriter, File indexDir, IndexCreatorProvider indexCreatorProvider, SegmentDictionaryCreator dictionaryCreator) throws Exception {
        try (ForwardIndexReader<?> reader = LoaderUtils.getForwardIndexReader((SegmentDirectory.Reader)segmentWriter, existingColMetadata);){
            int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
            IndexCreationContext.Builder builder = IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata).withLengthOfLongestEntry(lengthOfLongestEntry);
            builder.withDictionary(true);
            IndexCreationContext.Forward context = builder.build().forForwardIndex(null, this._indexLoadingConfig.getColumnProperties());
            try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context);){
                int numDocs = existingColMetadata.getTotalDocs();
                this.forwardIndexRewriteHelper(column, existingColMetadata, reader, creator, numDocs, dictionaryCreator, null);
            }
        }
    }

    static void removeDictRelatedIndexes(String column, SegmentDirectory.Writer segmentWriter) {
        DICTIONARY_BASED_INDEXES_TO_REWRITE.forEach(index -> segmentWriter.removeIndex(column, index));
    }

    private void disableDictionaryAndCreateRawForwardIndex(String column, SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider) throws Exception {
        ColumnMetadata existingColMetadata = this._segmentMetadata.getColumnMetadataFor(column);
        boolean isSingleValue = existingColMetadata.isSingleValue();
        File indexDir = this._segmentMetadata.getIndexDir();
        String segmentName = this._segmentMetadata.getName();
        File inProgress = new File(indexDir, column + ".fwd.inprogress");
        String fileExtension = isSingleValue ? ".sv.raw.fwd" : ".mv.raw.fwd";
        File fwdIndexFile = new File(indexDir, column + fileExtension);
        if (!inProgress.exists()) {
            FileUtils.touch((File)inProgress);
        } else {
            FileUtils.deleteQuietly((File)fwdIndexFile);
        }
        LOGGER.info("Creating raw forward index for segment={} and column={}", (Object)segmentName, (Object)column);
        this.rewriteDictToRawForwardIndex(column, existingColMetadata, segmentWriter, indexDir, indexCreatorProvider);
        segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
        segmentWriter.removeIndex(column, ColumnIndexType.DICTIONARY);
        LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, ColumnIndexType.FORWARD_INDEX);
        LOGGER.info("Created raw forwardIndex. Updating metadata properties for segment={} and column={}", (Object)segmentName, (Object)column);
        HashMap<String, String> metadataProperties = new HashMap<String, String>();
        metadataProperties.put(V1Constants.MetadataKeys.Column.getKeyFor((String)column, (String)"hasDictionary"), String.valueOf(false));
        metadataProperties.put(V1Constants.MetadataKeys.Column.getKeyFor((String)column, (String)"lengthOfEachEntry"), String.valueOf(0));
        SegmentMetadataUtils.updateMetadataProperties((SegmentMetadata)this._segmentMetadata, metadataProperties);
        ForwardIndexHandler.removeDictRelatedIndexes(column, segmentWriter);
        FileUtils.deleteQuietly((File)inProgress);
        LOGGER.info("Created raw based forward index for segment: {}, column: {}", (Object)segmentName, (Object)column);
    }

    private void rewriteDictToRawForwardIndex(String column, ColumnMetadata existingColMetadata, SegmentDirectory.Writer segmentWriter, File indexDir, IndexCreatorProvider indexCreatorProvider) throws Exception {
        try (ForwardIndexReader<?> reader = LoaderUtils.getForwardIndexReader((SegmentDirectory.Reader)segmentWriter, existingColMetadata);){
            BaseImmutableDictionary dictionary = LoaderUtils.getDictionary((SegmentDirectory.Reader)segmentWriter, existingColMetadata);
            IndexCreationContext.Builder builder = IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata);
            builder.withDictionary(false);
            if (existingColMetadata.isSingleValue()) {
                int lengthOfLongestEntry = existingColMetadata.getColumnMaxLength();
                builder.withLengthOfLongestEntry(lengthOfLongestEntry);
            } else {
                FieldSpec.DataType dataType = existingColMetadata.getDataType();
                boolean isFixedWidth = dataType.getStoredType().isFixedWidth();
                if (!isFixedWidth) {
                    int maxRowLength = this.getMaxRowLengthForMVColumn(column, reader, dictionary);
                    builder.withMaxRowLengthInBytes(maxRowLength);
                }
            }
            Map<String, ChunkCompressionType> compressionConfigs = this._indexLoadingConfig.getCompressionConfigs();
            ChunkCompressionType compressionType = compressionConfigs.containsKey(column) ? compressionConfigs.get(column) : SegmentColumnarIndexCreator.getDefaultCompressionType(existingColMetadata.getFieldType());
            IndexCreationContext.Forward context = builder.build().forForwardIndex(compressionType, this._indexLoadingConfig.getColumnProperties());
            try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context);){
                int numDocs = existingColMetadata.getTotalDocs();
                this.forwardIndexRewriteHelper(column, existingColMetadata, reader, creator, numDocs, null, dictionary);
            }
        }
    }

    private int getMaxRowLengthForMVColumn(String column, ForwardIndexReader reader, Dictionary dictionary) throws Exception {
        ColumnMetadata existingColMetadata = this._segmentMetadata.getColumnMetadataFor(column);
        AbstractColumnStatisticsCollector statsCollector = this.getStatsCollector(column, dictionary.getValueType().getStoredType());
        PinotSegmentColumnReader columnReader = new PinotSegmentColumnReader(reader, dictionary, null, existingColMetadata.getMaxNumberOfMultiValues());
        int numDocs = existingColMetadata.getTotalDocs();
        Preconditions.checkState((!existingColMetadata.getDataType().getStoredType().isFixedWidth() ? 1 : 0) != 0, (Object)("Column " + column + "is not a fixed width column."));
        Preconditions.checkState((!existingColMetadata.isSingleValue() ? 1 : 0) != 0, (Object)("Column " + column + "is not MV."));
        for (int i = 0; i < numDocs; ++i) {
            Object obj = columnReader.getValue(i);
            statsCollector.collect(obj);
        }
        statsCollector.seal();
        return statsCollector.getMaxRowLengthInBytes();
    }

    private AbstractColumnStatisticsCollector getStatsCollector(String column, FieldSpec.DataType storedType) throws Exception {
        AbstractColumnStatisticsCollector statsCollector;
        StatsCollectorConfig statsCollectorConfig = new StatsCollectorConfig(this._indexLoadingConfig.getTableConfig(), this._schema, null);
        switch (storedType) {
            case INT: {
                statsCollector = new IntColumnPreIndexStatsCollector(column, statsCollectorConfig);
                break;
            }
            case LONG: {
                statsCollector = new LongColumnPreIndexStatsCollector(column, statsCollectorConfig);
                break;
            }
            case FLOAT: {
                statsCollector = new FloatColumnPreIndexStatsCollector(column, statsCollectorConfig);
                break;
            }
            case DOUBLE: {
                statsCollector = new DoubleColumnPreIndexStatsCollector(column, statsCollectorConfig);
                break;
            }
            case STRING: {
                statsCollector = new StringColumnPreIndexStatsCollector(column, statsCollectorConfig);
                break;
            }
            case BYTES: {
                statsCollector = new BytesColumnPredIndexStatsCollector(column, statsCollectorConfig);
                break;
            }
            case BIG_DECIMAL: {
                statsCollector = new BigDecimalColumnPreIndexStatsCollector(column, statsCollectorConfig);
                break;
            }
            default: {
                throw new IllegalStateException("Unsupported storedType=" + storedType.toString() + " for column=" + column);
            }
        }
        return statsCollector;
    }

    protected static enum Operation {
        DISABLE_FORWARD_INDEX_FOR_DICT_COLUMN,
        DISABLE_FORWARD_INDEX_FOR_RAW_COLUMN,
        ENABLE_FORWARD_INDEX_FOR_DICT_COLUMN,
        ENABLE_FORWARD_INDEX_FOR_RAW_COLUMN,
        ENABLE_DICTIONARY,
        DISABLE_DICTIONARY,
        CHANGE_RAW_INDEX_COMPRESSION_TYPE;

    }
}

