/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction;
import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.KeyValueFileStoreScan;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.InternalRowKeyAndBucketExtractor;
import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.MergeTreeSplitGenerator;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;

public class ChangelogWithKeyFileStoreTable
extends AbstractFileStoreTable {
    private static final long serialVersionUID = 1L;
    private transient KeyValueFileStore lazyStore;

    ChangelogWithKeyFileStoreTable(FileIO fileIO, Path path, TableSchema tableSchema) {
        super(fileIO, path, tableSchema);
    }

    @Override
    protected FileStoreTable copy(TableSchema newTableSchema) {
        return new ChangelogWithKeyFileStoreTable(this.fileIO, this.path, newTableSchema);
    }

    public KeyValueFileStore store() {
        if (this.lazyStore == null) {
            MergeFunctionFactory<KeyValue> mfFactory;
            RowType rowType = this.tableSchema.logicalRowType();
            Options conf = Options.fromMap(this.tableSchema.options());
            CoreOptions options = new CoreOptions(conf);
            CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
            switch (mergeEngine) {
                case DEDUPLICATE: {
                    mfFactory = DeduplicateMergeFunction.factory();
                    break;
                }
                case PARTIAL_UPDATE: {
                    mfFactory = PartialUpdateMergeFunction.factory(conf.get(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE), rowType.getFieldTypes());
                    break;
                }
                case AGGREGATE: {
                    mfFactory = AggregateMergeFunction.factory(conf, this.tableSchema.fieldNames(), rowType.getFieldTypes(), this.tableSchema.primaryKeys());
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Unsupported merge engine: " + mergeEngine);
                }
            }
            if (options.changelogProducer() == CoreOptions.ChangelogProducer.LOOKUP) {
                mfFactory = LookupMergeFunction.wrap(mfFactory);
            }
            ChangelogWithKeyKeyValueFieldsExtractor extractor = ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
            this.lazyStore = new KeyValueFileStore(this.fileIO(), this.schemaManager(), this.tableSchema.id(), options, this.tableSchema.logicalPartitionType(), ChangelogWithKeyFileStoreTable.addKeyNamePrefix(this.tableSchema.logicalBucketKeyType()), new RowType(extractor.keyFields(this.tableSchema)), rowType, extractor, mfFactory);
        }
        return this.lazyStore;
    }

    private static RowType addKeyNamePrefix(RowType type) {
        return new RowType(type.getFields().stream().map(f -> new DataField(f.id(), "_KEY_" + f.name(), f.type(), f.description())).collect(Collectors.toList()));
    }

    private static List<DataField> addKeyNamePrefix(List<DataField> keyFields) {
        return keyFields.stream().map(f -> new DataField(f.id(), "_KEY_" + f.name(), f.type(), f.description())).collect(Collectors.toList());
    }

    @Override
    public SplitGenerator splitGenerator() {
        return new MergeTreeSplitGenerator(this.store().newKeyComparator(), this.store().options().splitTargetSize(), this.store().options().splitOpenFileCost());
    }

    @Override
    public boolean supportStreamingReadOverwrite() {
        return new CoreOptions(this.tableSchema.options()).streamingReadOverwrite();
    }

    @Override
    public BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer() {
        return (scan, predicate) -> {
            List<Predicate> keyFilters = PredicateBuilder.pickTransformFieldMapping(PredicateBuilder.splitAnd(predicate), this.tableSchema.fieldNames(), this.tableSchema.trimmedPrimaryKeys());
            if (keyFilters.size() > 0) {
                ((KeyValueFileStoreScan)scan).withKeyFilter(PredicateBuilder.and(keyFilters));
            }
        };
    }

    @Override
    public InnerTableRead newRead() {
        return new KeyValueTableRead(this.store().newRead()){

            @Override
            public InnerTableRead withFilter(Predicate predicate) {
                this.read.withFilter(predicate);
                return this;
            }

            @Override
            public InnerTableRead withProjection(int[][] projection) {
                this.read.withValueProjection(projection);
                return this;
            }

            @Override
            protected RecordReader.RecordIterator<InternalRow> rowDataRecordIteratorFromKv(RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
                return new ValueContentRowDataRecordIterator(kvRecordIterator);
            }
        };
    }

    @Override
    public TableWriteImpl<KeyValue> newWrite(String commitUser) {
        return this.newWrite(commitUser, null);
    }

    public TableWriteImpl<KeyValue> newWrite(String commitUser, ManifestCacheFilter manifestFilter) {
        SequenceGenerator sequenceGenerator = this.store().options().sequenceField().map(field -> new SequenceGenerator((String)field, this.schema().logicalRowType())).orElse(null);
        KeyValue kv = new KeyValue();
        return new TableWriteImpl<KeyValue>(this.store().newWrite(commitUser, manifestFilter), new InternalRowKeyAndBucketExtractor(this.tableSchema), record -> {
            long sequenceNumber = sequenceGenerator == null ? -1L : sequenceGenerator.generate(record.row());
            return kv.replace(record.primaryKey(), sequenceNumber, record.row().getRowKind(), record.row());
        });
    }

    static class ChangelogWithKeyKeyValueFieldsExtractor
    implements KeyValueFieldsExtractor {
        private static final long serialVersionUID = 1L;
        static final ChangelogWithKeyKeyValueFieldsExtractor EXTRACTOR = new ChangelogWithKeyKeyValueFieldsExtractor();

        private ChangelogWithKeyKeyValueFieldsExtractor() {
        }

        @Override
        public List<DataField> keyFields(TableSchema schema) {
            return ChangelogWithKeyFileStoreTable.addKeyNamePrefix(schema.trimmedPrimaryKeysFields());
        }

        @Override
        public List<DataField> valueFields(TableSchema schema) {
            return schema.fields();
        }
    }
}

