/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexCommon;
import org.apache.paimon.fileindex.FileIndexFormat;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.FlinkWriteSink;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileIndexWriter;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;

public class RewriteFileIndexSink
extends FlinkWriteSink<ManifestEntry> {
    public RewriteFileIndexSink(FileStoreTable table) {
        super(table, null);
    }

    @Override
    protected OneInputStreamOperatorFactory<ManifestEntry, Committable> createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, String commitUser) {
        return new FileIndexModificationOperatorFactory(this.table.coreOptions().toConfiguration(), this.table);
    }

    private static class SchemaInfo {
        private final RowType fileSchema;
        private final Map<String, String> colNameMapping;
        private final int[] projectedIndexCols;
        private final Set<String> projectedColFullNames;

        private SchemaInfo(RowType fileSchema, Map<String, String> colNameMapping, int[] projectedIndexCols, Set<String> projectedColFullNames) {
            this.fileSchema = fileSchema;
            this.colNameMapping = colNameMapping;
            this.projectedIndexCols = projectedIndexCols;
            this.projectedColFullNames = projectedColFullNames;
        }
    }

    private static class SchemaCache {
        private final FileIndexOptions fileIndexOptions;
        private final SchemaManager schemaManager;
        private final TableSchema currentSchema;
        private final Map<Long, SchemaInfo> schemaInfos;
        private final Set<Long> fileSchemaIds;

        public SchemaCache(FileIndexOptions fileIndexOptions, SchemaManager schemaManager) {
            this.fileIndexOptions = fileIndexOptions;
            this.schemaManager = schemaManager;
            this.currentSchema = schemaManager.latest().orElseThrow(RuntimeException::new);
            this.schemaInfos = new HashMap<Long, SchemaInfo>();
            this.fileSchemaIds = new HashSet<Long>();
        }

        public SchemaInfo schemaInfo(long schemaId) {
            if (!this.fileSchemaIds.contains(schemaId)) {
                RowType fileSchema = this.schemaManager.schema(schemaId).logicalRowType();
                Map<String, String> colNameMapping = schemaId == this.currentSchema.id() ? null : SchemaCache.createIndexNameMapping(this.currentSchema.fields(), fileSchema.getFields());
                ArrayList<String> projectedColNames = new ArrayList<String>();
                HashSet<String> projectedColFullNames = new HashSet<String>();
                for (Map.Entry<FileIndexOptions.Column, Map<String, Options>> entry : this.fileIndexOptions.entrySet()) {
                    String columnName;
                    FileIndexOptions.Column column = entry.getKey();
                    if (colNameMapping != null) {
                        columnName = colNameMapping.getOrDefault(column.getColumnName(), null);
                        if (columnName == null) {
                            continue;
                        }
                    } else {
                        columnName = column.getColumnName();
                    }
                    projectedColNames.add(columnName);
                    String fullColumnName = column.isNestedColumn() ? FileIndexCommon.toMapKey(columnName, column.getNestedColumnName()) : column.getColumnName();
                    projectedColFullNames.add(fullColumnName);
                }
                this.schemaInfos.put(schemaId, new SchemaInfo(fileSchema, colNameMapping, projectedColNames.stream().mapToInt(fileSchema::getFieldIndex).toArray(), projectedColFullNames));
                this.fileSchemaIds.add(schemaId);
            }
            return this.schemaInfos.get(schemaId);
        }

        private static Map<String, String> createIndexNameMapping(List<DataField> tableFields, List<DataField> dataFields) {
            HashMap<String, String> indexMapping = new HashMap<String, String>();
            HashMap<Integer, String> fieldIdToIndex = new HashMap<Integer, String>();
            for (DataField dataField : tableFields) {
                fieldIdToIndex.put(dataField.id(), dataField.name());
            }
            for (DataField tableField : dataFields) {
                String dataFieldIndex = fieldIdToIndex.getOrDefault(tableField.id(), null);
                if (dataFieldIndex == null) continue;
                indexMapping.put(dataFieldIndex, tableField.name());
            }
            return indexMapping;
        }
    }

    public static class FileIndexProcessor {
        private final FileStoreTable table;
        private final FileIndexOptions fileIndexOptions;
        private final FileIO fileIO;
        private final FileStorePathFactory pathFactory;
        private final Map<Pair<BinaryRow, Integer>, DataFilePathFactory> dataFilePathFactoryMap;
        private final SchemaCache schemaInfoCache;
        private final long sizeInMeta;

        public FileIndexProcessor(FileStoreTable table) {
            this.table = table;
            this.fileIndexOptions = table.coreOptions().indexColumnsOptions();
            this.fileIO = table.fileIO();
            this.pathFactory = table.store().pathFactory();
            this.dataFilePathFactoryMap = new HashMap<Pair<BinaryRow, Integer>, DataFilePathFactory>();
            this.schemaInfoCache = new SchemaCache(this.fileIndexOptions, new SchemaManager(this.fileIO, table.location()));
            this.sizeInMeta = table.coreOptions().fileIndexInManifestThreshold();
        }

        public DataFileMeta process(BinaryRow partition, int bucket, DataFileMeta dataFileMeta) throws IOException {
            Object name2;
            Path newIndexPath;
            Map<Object, Object> maintainers;
            DataFilePathFactory dataFilePathFactory = this.dataFilePathFactoryMap.computeIfAbsent(Pair.of(partition, bucket), p -> this.pathFactory.createDataFilePathFactory(partition, bucket));
            SchemaInfo schemaInfo = this.schemaInfoCache.schemaInfo(dataFileMeta.schemaId());
            ArrayList<String> extras = new ArrayList<String>(dataFileMeta.extraFiles());
            List indexFiles = dataFileMeta.extraFiles().stream().filter(name -> name.endsWith(".index")).collect(Collectors.toList());
            extras.removeAll(indexFiles);
            if (!indexFiles.isEmpty()) {
                String indexFile = (String)indexFiles.get(0);
                try (FileIndexFormat.Reader reader = FileIndexFormat.createReader(this.fileIO.newInputStream(dataFilePathFactory.toAlignedPath(indexFile, dataFileMeta)), schemaInfo.fileSchema);){
                    maintainers = reader.readAll();
                }
                newIndexPath = DataFilePathFactory.createNewFileIndexFilePath(dataFilePathFactory.toAlignedPath(indexFile, dataFileMeta));
            } else {
                maintainers = new HashMap();
                newIndexPath = DataFilePathFactory.dataFileToFileIndexPath(dataFilePathFactory.toPath(dataFileMeta));
            }
            for (Map.Entry entry : new HashSet(maintainers.entrySet())) {
                name2 = (String)entry.getKey();
                if (!schemaInfo.projectedColFullNames.contains(name2)) {
                    maintainers.remove(name2);
                    continue;
                }
                Map indexTypeBytes = (Map)maintainers.get(name2);
                for (String indexType : ((Map)entry.getValue()).keySet()) {
                    if (indexTypeBytes.containsKey(indexType)) continue;
                    indexTypeBytes.remove(indexType);
                }
            }
            DataFileIndexWriter dataFileIndexWriter = DataFileIndexWriter.create(this.fileIO, newIndexPath, schemaInfo.fileSchema.project(schemaInfo.projectedIndexCols), this.fileIndexOptions, schemaInfo.colNameMapping);
            if (dataFileIndexWriter != null) {
                RecordReader<InternalRow> recordReader = this.table.newReadBuilder().withProjection(schemaInfo.projectedIndexCols).newRead().createReader(DataSplit.builder().withPartition(partition).withBucket(bucket).withBucketPath(this.pathFactory.bucketPath(partition, bucket).toString()).withDataFiles(Collections.singletonList(dataFileMeta)).rawConvertible(true).build());
                name2 = null;
                try {
                    recordReader.forEachRemaining(dataFileIndexWriter::write);
                }
                catch (Throwable throwable) {
                    name2 = throwable;
                    throw throwable;
                }
                finally {
                    if (recordReader != null) {
                        if (name2 != null) {
                            try {
                                recordReader.close();
                            }
                            catch (Throwable throwable) {
                                ((Throwable)name2).addSuppressed(throwable);
                            }
                        } else {
                            recordReader.close();
                        }
                    }
                }
                dataFileIndexWriter.serializeMaintainers().forEach((key, value) -> maintainers.computeIfAbsent(key, k -> new HashMap()).putAll(value));
            }
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try (FileIndexFormat.Writer indexWriter = FileIndexFormat.createWriter(byteArrayOutputStream);){
                if (!maintainers.isEmpty()) {
                    indexWriter.writeColumnIndexes(maintainers);
                }
            }
            if ((long)byteArrayOutputStream.size() > this.sizeInMeta) {
                var13_19 = null;
                try (PositionOutputStream outputStream = this.fileIO.newOutputStream(newIndexPath, true);){
                    ((OutputStream)outputStream).write(byteArrayOutputStream.toByteArray());
                }
                catch (Throwable throwable) {
                    var13_19 = throwable;
                    throw throwable;
                }
                extras.add(newIndexPath.getName());
                return dataFileMeta.copy(extras);
            }
            if (byteArrayOutputStream.size() == 0) {
                return dataFileMeta.copy(extras);
            }
            return dataFileMeta.copy(byteArrayOutputStream.toByteArray());
        }
    }

    private static class FileIndexModificationOperator
    extends PrepareCommitOperator<ManifestEntry, Committable> {
        private static final long serialVersionUID = 1L;
        private final transient FileIndexProcessor fileIndexProcessor;
        private final transient List<CommitMessage> messages;

        private FileIndexModificationOperator(StreamOperatorParameters<Committable> parameters, Options options, FileStoreTable table) {
            super(parameters, options);
            this.fileIndexProcessor = new FileIndexProcessor(table);
            this.messages = new ArrayList<CommitMessage>();
        }

        public void processElement(StreamRecord<ManifestEntry> element) throws Exception {
            ManifestEntry entry = (ManifestEntry)element.getValue();
            BinaryRow partition = entry.partition();
            int bucket = entry.bucket();
            DataFileMeta file = entry.file();
            DataFileMeta indexedFile = this.fileIndexProcessor.process(partition, bucket, file);
            CommitMessageImpl commitMessage = new CommitMessageImpl(partition, bucket, DataIncrement.emptyIncrement(), new CompactIncrement(Collections.singletonList(file), Collections.singletonList(indexedFile), Collections.emptyList()));
            this.messages.add(commitMessage);
        }

        @Override
        protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId) throws IOException {
            ArrayList<CommitMessage> temp = new ArrayList<CommitMessage>(this.messages);
            this.messages.clear();
            return temp.stream().map(s -> new Committable(checkpointId, Committable.Kind.FILE, s)).collect(Collectors.toList());
        }
    }

    private static class FileIndexModificationOperatorFactory
    extends PrepareCommitOperator.Factory<ManifestEntry, Committable> {
        private final FileStoreTable table;

        public FileIndexModificationOperatorFactory(Options options, FileStoreTable table) {
            super(options);
            this.table = table;
        }

        public <T extends StreamOperator<Committable>> T createStreamOperator(StreamOperatorParameters<Committable> parameters) {
            return (T)((Object)new FileIndexModificationOperator(parameters, this.options, this.table));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return FileIndexModificationOperator.class;
        }
    }
}

