/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.mergetree.compact;

import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowUtils;
import org.apache.paimon.utils.Projection;

public class PartialUpdateMergeFunction
implements MergeFunction<KeyValue> {
    public static final String SEQUENCE_GROUP = "sequence-group";
    private final InternalRow.FieldGetter[] getters;
    private final boolean ignoreDelete;
    private final Map<Integer, SequenceGenerator> fieldSequences;
    private final Map<Integer, FieldAggregator> fieldAggregators;
    private InternalRow currentKey;
    private long latestSequenceNumber;
    private boolean isEmpty;
    private GenericRow row;
    private KeyValue reused;

    protected PartialUpdateMergeFunction(InternalRow.FieldGetter[] getters, boolean ignoreDelete, Map<Integer, SequenceGenerator> fieldSequences, Map<Integer, FieldAggregator> fieldAggregators) {
        this.getters = getters;
        this.ignoreDelete = ignoreDelete;
        this.fieldSequences = fieldSequences;
        this.fieldAggregators = fieldAggregators;
    }

    @Override
    public void reset() {
        this.currentKey = null;
        this.row = new GenericRow(this.getters.length);
        this.fieldAggregators.values().forEach(FieldAggregator::reset);
        this.isEmpty = true;
    }

    @Override
    public void add(KeyValue kv) {
        this.currentKey = kv.key();
        if (kv.valueKind().isRetract()) {
            if (this.ignoreDelete) {
                return;
            }
            if (this.fieldSequences.size() > 1) {
                this.retractWithSequenceGroup(kv);
                return;
            }
            String msg = String.join((CharSequence)"\n", "By default, Partial update can not accept delete records, you can choose one of the following solutions:", "1. Configure 'partial-update.ignore-delete' to ignore delete records.", "2. Configure 'sequence-group's to retract partial columns.");
            throw new IllegalArgumentException(msg);
        }
        this.latestSequenceNumber = kv.sequenceNumber();
        this.isEmpty = false;
        if (this.fieldSequences.isEmpty()) {
            this.updateNonNullFields(kv);
        } else {
            this.updateWithSequenceGroup(kv);
        }
    }

    private void updateNonNullFields(KeyValue kv) {
        for (int i = 0; i < this.getters.length; ++i) {
            Object field = this.getters[i].getFieldOrNull(kv.value());
            if (field == null) continue;
            this.row.setField(i, field);
        }
    }

    private void updateWithSequenceGroup(KeyValue kv) {
        for (int i = 0; i < this.getters.length; ++i) {
            Object field = this.getters[i].getFieldOrNull(kv.value());
            SequenceGenerator sequenceGen = this.fieldSequences.get(i);
            FieldAggregator aggregator = this.fieldAggregators.get(i);
            Object accumulator = this.getters[i].getFieldOrNull(this.row);
            if (sequenceGen == null) {
                if (aggregator != null) {
                    this.row.setField(i, aggregator.agg(accumulator, field));
                    continue;
                }
                if (field == null) continue;
                this.row.setField(i, field);
                continue;
            }
            Long currentSeq = sequenceGen.generateNullable(kv.value());
            if (currentSeq == null) continue;
            Long previousSeq = sequenceGen.generateNullable(this.row);
            if (previousSeq == null || currentSeq >= previousSeq) {
                this.row.setField(i, aggregator == null ? field : aggregator.agg(accumulator, field));
                continue;
            }
            if (aggregator == null) continue;
            this.row.setField(i, aggregator.agg(field, accumulator));
        }
    }

    private void retractWithSequenceGroup(KeyValue kv) {
        for (int i = 0; i < this.getters.length; ++i) {
            Object accumulator;
            Long currentSeq;
            SequenceGenerator sequenceGen = this.fieldSequences.get(i);
            if (sequenceGen == null || (currentSeq = sequenceGen.generateNullable(kv.value())) == null) continue;
            Long previousSeq = sequenceGen.generateNullable(this.row);
            FieldAggregator aggregator = this.fieldAggregators.get(i);
            if (previousSeq == null || currentSeq >= previousSeq) {
                if (sequenceGen.index() == i) {
                    this.row.setField(i, this.getters[i].getFieldOrNull(kv.value()));
                    continue;
                }
                if (aggregator == null) {
                    this.row.setField(i, null);
                    continue;
                }
                accumulator = this.getters[i].getFieldOrNull(this.row);
                this.row.setField(i, aggregator.retract(accumulator, this.getters[i].getFieldOrNull(kv.value())));
                continue;
            }
            if (aggregator == null) continue;
            accumulator = this.getters[i].getFieldOrNull(this.row);
            this.row.setField(i, aggregator.retract(accumulator, this.getters[i].getFieldOrNull(kv.value())));
        }
    }

    @Override
    @Nullable
    public KeyValue getResult() {
        if (this.isEmpty) {
            return null;
        }
        if (this.reused == null) {
            this.reused = new KeyValue();
        }
        return this.reused.replace(this.currentKey, this.latestSequenceNumber, RowKind.INSERT, this.row);
    }

    public static MergeFunctionFactory<KeyValue> factory(Options options, RowType rowType, List<String> primaryKeys) {
        return new Factory(options, rowType, primaryKeys);
    }

    private static class Factory
    implements MergeFunctionFactory<KeyValue> {
        private static final long serialVersionUID = 1L;
        private final boolean ignoreDelete;
        private final List<DataType> tableTypes;
        private final Map<Integer, SequenceGenerator> fieldSequences;
        private final Map<Integer, FieldAggregator> fieldAggregators;

        private Factory(Options options, RowType rowType, List<String> primaryKeys) {
            this.ignoreDelete = options.get(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE);
            this.tableTypes = rowType.getFieldTypes();
            List<String> fieldNames = rowType.getFieldNames();
            this.fieldSequences = new HashMap<Integer, SequenceGenerator>();
            for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
                String k = entry.getKey();
                String v = entry.getValue();
                if (!k.startsWith("fields") || !k.endsWith(PartialUpdateMergeFunction.SEQUENCE_GROUP)) continue;
                String sequenceFieldName = k.substring("fields".length() + 1, k.length() - PartialUpdateMergeFunction.SEQUENCE_GROUP.length() - 1);
                SequenceGenerator sequenceGen = new SequenceGenerator(sequenceFieldName, rowType);
                Arrays.stream(v.split(",")).map(fieldName -> {
                    int field = fieldNames.indexOf(fieldName);
                    if (field == -1) {
                        throw new IllegalArgumentException(String.format("Field %s can not be found in table schema", fieldName));
                    }
                    return field;
                }).forEach(field -> {
                    if (this.fieldSequences.containsKey(field)) {
                        throw new IllegalArgumentException(String.format("Field %s is defined repeatedly by multiple groups: %s", fieldNames.get((int)field), k));
                    }
                    this.fieldSequences.put((Integer)field, sequenceGen);
                });
                this.fieldSequences.put(sequenceGen.index(), sequenceGen);
            }
            this.fieldAggregators = this.createFieldAggregators(rowType, primaryKeys, new CoreOptions(options));
            if (this.fieldAggregators.size() > 0 && this.fieldSequences.isEmpty()) {
                throw new IllegalArgumentException("Must use sequence group for aggregation functions.");
            }
        }

        @Override
        public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
            if (projection != null) {
                int i;
                HashMap<Integer, SequenceGenerator> projectedSequences = new HashMap<Integer, SequenceGenerator>();
                HashMap<Integer, FieldAggregator> projectedAggregators = new HashMap<Integer, FieldAggregator>();
                int[] projects = Projection.of(projection).toTopLevelIndexes();
                HashMap<Integer, Integer> indexMap = new HashMap<Integer, Integer>();
                for (i = 0; i < projects.length; ++i) {
                    indexMap.put(projects[i], i);
                }
                this.fieldSequences.forEach((field, sequence) -> {
                    int newField = indexMap.getOrDefault(field, -1);
                    if (newField != -1) {
                        int newSequenceId = indexMap.getOrDefault(sequence.index(), -1);
                        if (newSequenceId == -1) {
                            throw new RuntimeException(String.format("Can not find new sequence field for new field. new field index is %s", newField));
                        }
                        projectedSequences.put(newField, new SequenceGenerator(newSequenceId, sequence.fieldType()));
                    }
                });
                for (i = 0; i < projects.length; ++i) {
                    if (!this.fieldAggregators.containsKey(projects[i])) continue;
                    projectedAggregators.put(i, this.fieldAggregators.get(projects[i]));
                }
                return new PartialUpdateMergeFunction(InternalRowUtils.createFieldGetters(Projection.of(projection).project(this.tableTypes)), this.ignoreDelete, projectedSequences, projectedAggregators);
            }
            return new PartialUpdateMergeFunction(InternalRowUtils.createFieldGetters(this.tableTypes), this.ignoreDelete, this.fieldSequences, this.fieldAggregators);
        }

        @Override
        public MergeFunctionFactory.AdjustedProjection adjustProjection(@Nullable int[][] projection) {
            if (this.fieldSequences.isEmpty()) {
                return new MergeFunctionFactory.AdjustedProjection(projection, null);
            }
            if (projection == null) {
                return new MergeFunctionFactory.AdjustedProjection(null, null);
            }
            LinkedHashSet<Integer> extraFields = new LinkedHashSet<Integer>();
            int[] topProjects = Projection.of(projection).toTopLevelIndexes();
            Set indexSet = Arrays.stream(topProjects).boxed().collect(Collectors.toSet());
            for (int index : topProjects) {
                SequenceGenerator generator = this.fieldSequences.get(index);
                if (generator == null || indexSet.contains(generator.index())) continue;
                extraFields.add(generator.index());
            }
            int[] allProjects = Stream.concat(Arrays.stream(topProjects).boxed(), extraFields.stream()).mapToInt(Integer::intValue).toArray();
            int[][] pushdown = Projection.of(allProjects).toNestedIndexes();
            int[][] outer = Projection.of(IntStream.range(0, topProjects.length).toArray()).toNestedIndexes();
            return new MergeFunctionFactory.AdjustedProjection(pushdown, outer);
        }

        private Map<Integer, FieldAggregator> createFieldAggregators(RowType rowType, List<String> primaryKeys, CoreOptions options) {
            List<String> fieldNames = rowType.getFieldNames();
            List<DataType> fieldTypes = rowType.getFieldTypes();
            HashMap<Integer, FieldAggregator> fieldAggregators = new HashMap<Integer, FieldAggregator>();
            for (int i = 0; i < fieldNames.size(); ++i) {
                String fieldName = fieldNames.get(i);
                DataType fieldType = fieldTypes.get(i);
                boolean isPrimaryKey = primaryKeys.contains(fieldName);
                String strAggFunc = options.fieldAggFunc(fieldName);
                boolean ignoreRetract = options.fieldAggIgnoreRetract(fieldName);
                if (strAggFunc == null) continue;
                fieldAggregators.put(i, FieldAggregator.createFieldAggregator(fieldType, strAggFunc, ignoreRetract, isPrimaryKey, options, fieldName));
            }
            return fieldAggregators;
        }
    }
}

