/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.mergetree.compact;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.InternalRowUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.UserDefinedSeqComparator;

public class PartialUpdateMergeFunction
implements MergeFunction<KeyValue> {
    public static final String SEQUENCE_GROUP = "sequence-group";
    private final InternalRow.FieldGetter[] getters;
    private final boolean ignoreDelete;
    private final Map<Integer, FieldsComparator> fieldSeqComparators;
    private final boolean fieldSequenceEnabled;
    private final Map<Integer, FieldAggregator> fieldAggregators;
    private final boolean removeRecordOnDelete;
    private InternalRow currentKey;
    private long latestSequenceNumber;
    private GenericRow row;
    private KeyValue reused;

    protected PartialUpdateMergeFunction(InternalRow.FieldGetter[] getters, boolean ignoreDelete, Map<Integer, FieldsComparator> fieldSeqComparators, Map<Integer, FieldAggregator> fieldAggregators, boolean fieldSequenceEnabled, boolean removeRecordOnDelete) {
        this.getters = getters;
        this.ignoreDelete = ignoreDelete;
        this.fieldSeqComparators = fieldSeqComparators;
        this.fieldAggregators = fieldAggregators;
        this.fieldSequenceEnabled = fieldSequenceEnabled;
        this.removeRecordOnDelete = removeRecordOnDelete;
    }

    @Override
    public void reset() {
        this.currentKey = null;
        this.row = new GenericRow(this.getters.length);
        this.fieldAggregators.values().forEach(FieldAggregator::reset);
    }

    @Override
    public void add(KeyValue kv) {
        this.currentKey = kv.key();
        if (kv.valueKind().isRetract()) {
            if (this.ignoreDelete) {
                return;
            }
            if (this.fieldSequenceEnabled) {
                this.retractWithSequenceGroup(kv);
                return;
            }
            if (this.removeRecordOnDelete) {
                if (kv.valueKind() == RowKind.DELETE) {
                    this.row = null;
                }
                return;
            }
            String msg = String.join((CharSequence)"\n", "By default, Partial update can not accept delete records, you can choose one of the following solutions:", "1. Configure 'ignore-delete' to ignore delete records.", "2. Configure 'sequence-group's to retract partial columns.");
            throw new IllegalArgumentException(msg);
        }
        this.latestSequenceNumber = kv.sequenceNumber();
        if (this.fieldSeqComparators.isEmpty()) {
            this.updateNonNullFields(kv);
        } else {
            this.updateWithSequenceGroup(kv);
        }
    }

    private void updateNonNullFields(KeyValue kv) {
        for (int i = 0; i < this.getters.length; ++i) {
            Object field = this.getters[i].getFieldOrNull(kv.value());
            if (field == null) continue;
            this.row.setField(i, field);
        }
    }

    private void updateWithSequenceGroup(KeyValue kv) {
        for (int i = 0; i < this.getters.length; ++i) {
            Object field = this.getters[i].getFieldOrNull(kv.value());
            FieldsComparator seqComparator = this.fieldSeqComparators.get(i);
            FieldAggregator aggregator = this.fieldAggregators.get(i);
            Object accumulator = this.getters[i].getFieldOrNull(this.row);
            if (seqComparator == null) {
                if (aggregator != null) {
                    this.row.setField(i, aggregator.agg(accumulator, field));
                    continue;
                }
                if (field == null) continue;
                this.row.setField(i, field);
                continue;
            }
            if (this.isEmptySequenceGroup(kv, seqComparator)) continue;
            if (seqComparator.compare(kv.value(), this.row) >= 0) {
                int index = i;
                if (Arrays.stream(seqComparator.compareFields()).anyMatch(seqIndex -> seqIndex == index)) {
                    for (int fieldIndex : seqComparator.compareFields()) {
                        this.row.setField(fieldIndex, this.getters[fieldIndex].getFieldOrNull(kv.value()));
                    }
                }
                this.row.setField(i, aggregator == null ? field : aggregator.agg(accumulator, field));
                continue;
            }
            if (aggregator == null) continue;
            this.row.setField(i, aggregator.aggReversed(accumulator, field));
        }
    }

    private boolean isEmptySequenceGroup(KeyValue kv, FieldsComparator comparator) {
        for (int fieldIndex : comparator.compareFields()) {
            if (this.getters[fieldIndex].getFieldOrNull(kv.value()) == null) continue;
            return false;
        }
        return true;
    }

    private void retractWithSequenceGroup(KeyValue kv) {
        HashSet<Integer> updatedSequenceFields = new HashSet<Integer>();
        for (int i = 0; i < this.getters.length; ++i) {
            FieldsComparator seqComparator = this.fieldSeqComparators.get(i);
            if (seqComparator == null) continue;
            FieldAggregator aggregator = this.fieldAggregators.get(i);
            if (this.isEmptySequenceGroup(kv, seqComparator)) continue;
            if (seqComparator.compare(kv.value(), this.row) >= 0) {
                int index = i;
                if (Arrays.stream(seqComparator.compareFields()).anyMatch(field -> field == index)) {
                    for (int field2 : seqComparator.compareFields()) {
                        if (updatedSequenceFields.contains(field2)) continue;
                        this.row.setField(field2, this.getters[field2].getFieldOrNull(kv.value()));
                        updatedSequenceFields.add(field2);
                    }
                    continue;
                }
                if (aggregator == null) {
                    this.row.setField(i, null);
                    continue;
                }
                Object accumulator = this.getters[i].getFieldOrNull(this.row);
                this.row.setField(i, aggregator.retract(accumulator, this.getters[i].getFieldOrNull(kv.value())));
                continue;
            }
            if (aggregator == null) continue;
            Object accumulator = this.getters[i].getFieldOrNull(this.row);
            this.row.setField(i, aggregator.retract(accumulator, this.getters[i].getFieldOrNull(kv.value())));
        }
    }

    @Override
    public KeyValue getResult() {
        if (this.reused == null) {
            this.reused = new KeyValue();
        }
        if (this.removeRecordOnDelete && this.row == null) {
            return null;
        }
        return this.reused.replace(this.currentKey, this.latestSequenceNumber, RowKind.INSERT, this.row);
    }

    public static MergeFunctionFactory<KeyValue> factory(Options options, RowType rowType, List<String> primaryKeys) {
        return new Factory(options, rowType, primaryKeys);
    }

    private static class Factory
    implements MergeFunctionFactory<KeyValue> {
        private static final long serialVersionUID = 1L;
        private final boolean ignoreDelete;
        private final RowType rowType;
        private final List<DataType> tableTypes;
        private final Map<Integer, Supplier<FieldsComparator>> fieldSeqComparators;
        private final Map<Integer, Supplier<FieldAggregator>> fieldAggregators;
        private final boolean removeRecordOnDelete;

        private Factory(Options options, RowType rowType, List<String> primaryKeys) {
            this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
            this.rowType = rowType;
            this.tableTypes = rowType.getFieldTypes();
            List<String> fieldNames = rowType.getFieldNames();
            this.fieldSeqComparators = new HashMap<Integer, Supplier<FieldsComparator>>();
            for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
                String k = entry.getKey();
                String v = entry.getValue();
                if (!k.startsWith("fields") || !k.endsWith(PartialUpdateMergeFunction.SEQUENCE_GROUP)) continue;
                List<String> sequenceFields = Arrays.stream(k.substring("fields".length() + 1, k.length() - PartialUpdateMergeFunction.SEQUENCE_GROUP.length() - 1).split(",")).map(fieldName -> this.validateFieldName((String)fieldName, fieldNames)).collect(Collectors.toList());
                Supplier<FieldsComparator> userDefinedSeqComparator = () -> UserDefinedSeqComparator.create(rowType, sequenceFields);
                Arrays.stream(v.split(",")).map(fieldName -> fieldNames.indexOf(this.validateFieldName((String)fieldName, fieldNames))).forEach(field -> {
                    if (this.fieldSeqComparators.containsKey(field)) {
                        throw new IllegalArgumentException(String.format("Field %s is defined repeatedly by multiple groups: %s", fieldNames.get((int)field), k));
                    }
                    this.fieldSeqComparators.put((Integer)field, userDefinedSeqComparator);
                });
                sequenceFields.forEach(fieldName -> {
                    int index = fieldNames.indexOf(fieldName);
                    this.fieldSeqComparators.put(index, userDefinedSeqComparator);
                });
            }
            this.fieldAggregators = this.createFieldAggregators(rowType, primaryKeys, new CoreOptions(options));
            if (!this.fieldAggregators.isEmpty() && this.fieldSeqComparators.isEmpty()) {
                throw new IllegalArgumentException("Must use sequence group for aggregation functions.");
            }
            this.removeRecordOnDelete = options.get(CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
            Preconditions.checkState(!this.removeRecordOnDelete || !this.ignoreDelete, String.format("%s and %s have conflicting behavior so should not be enabled at the same time.", CoreOptions.IGNORE_DELETE, CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
            Preconditions.checkState(!this.removeRecordOnDelete || this.fieldSeqComparators.isEmpty(), String.format("sequence group and %s have conflicting behavior so should not be enabled at the same time.", CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
        }

        @Override
        public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
            if (projection != null) {
                HashMap<Integer, FieldsComparator> projectedSeqComparators = new HashMap<Integer, FieldsComparator>();
                HashMap<Integer, FieldAggregator> projectedAggregators = new HashMap<Integer, FieldAggregator>();
                int[] projects = Projection.of(projection).toTopLevelIndexes();
                HashMap<Integer, Integer> indexMap = new HashMap<Integer, Integer>();
                List<DataField> dataFields = this.rowType.getFields();
                ArrayList<DataType> newDataTypes = new ArrayList<DataType>();
                for (int i = 0; i < projects.length; ++i) {
                    indexMap.put(projects[i], i);
                    newDataTypes.add(dataFields.get(projects[i]).type());
                }
                RowType newRowType = RowType.builder().fields(newDataTypes).build();
                this.fieldSeqComparators.forEach((field, comparatorSupplier) -> {
                    FieldsComparator comparator = (FieldsComparator)comparatorSupplier.get();
                    int newField = indexMap.getOrDefault(field, -1);
                    if (newField != -1) {
                        int[] newSequenceFields = Arrays.stream(comparator.compareFields()).map(index -> {
                            int newIndex = indexMap.getOrDefault(index, -1);
                            if (newIndex == -1) {
                                throw new RuntimeException(String.format("Can not find new sequence field for new field. new field index is %s", newField));
                            }
                            return newIndex;
                        }).toArray();
                        projectedSeqComparators.put(newField, UserDefinedSeqComparator.create(newRowType, newSequenceFields));
                    }
                });
                for (int i = 0; i < projects.length; ++i) {
                    if (!this.fieldAggregators.containsKey(projects[i])) continue;
                    projectedAggregators.put(i, this.fieldAggregators.get(projects[i]).get());
                }
                return new PartialUpdateMergeFunction(InternalRowUtils.createFieldGetters(Projection.of(projection).project(this.tableTypes)), this.ignoreDelete, projectedSeqComparators, projectedAggregators, !this.fieldSeqComparators.isEmpty(), this.removeRecordOnDelete);
            }
            HashMap<Integer, FieldsComparator> fieldSeqComparators = new HashMap<Integer, FieldsComparator>();
            this.fieldSeqComparators.forEach((f, supplier) -> {
                FieldsComparator cfr_ignored_0 = (FieldsComparator)fieldSeqComparators.put((Integer)f, (FieldsComparator)supplier.get());
            });
            HashMap<Integer, FieldAggregator> fieldAggregators = new HashMap<Integer, FieldAggregator>();
            this.fieldAggregators.forEach((f, supplier) -> {
                FieldAggregator cfr_ignored_0 = (FieldAggregator)fieldAggregators.put((Integer)f, (FieldAggregator)supplier.get());
            });
            return new PartialUpdateMergeFunction(InternalRowUtils.createFieldGetters(this.tableTypes), this.ignoreDelete, fieldSeqComparators, fieldAggregators, !fieldSeqComparators.isEmpty(), this.removeRecordOnDelete);
        }

        @Override
        public MergeFunctionFactory.AdjustedProjection adjustProjection(@Nullable int[][] projection) {
            if (this.fieldSeqComparators.isEmpty()) {
                return new MergeFunctionFactory.AdjustedProjection(projection, null);
            }
            if (projection == null) {
                return new MergeFunctionFactory.AdjustedProjection(null, null);
            }
            LinkedHashSet<Integer> extraFields = new LinkedHashSet<Integer>();
            int[] topProjects = Projection.of(projection).toTopLevelIndexes();
            Set indexSet = Arrays.stream(topProjects).boxed().collect(Collectors.toSet());
            for (int index : topProjects) {
                Supplier<FieldsComparator> comparatorSupplier = this.fieldSeqComparators.get(index);
                if (comparatorSupplier == null) continue;
                FieldsComparator comparator = comparatorSupplier.get();
                for (int field : comparator.compareFields()) {
                    if (indexSet.contains(field)) continue;
                    extraFields.add(field);
                }
            }
            int[] allProjects = Stream.concat(Arrays.stream(topProjects).boxed(), extraFields.stream()).mapToInt(Integer::intValue).toArray();
            int[][] pushDown = Projection.of(allProjects).toNestedIndexes();
            int[][] outer = Projection.of(IntStream.range(0, topProjects.length).toArray()).toNestedIndexes();
            return new MergeFunctionFactory.AdjustedProjection(pushDown, outer);
        }

        private String validateFieldName(String fieldName, List<String> fieldNames) {
            int field = fieldNames.indexOf(fieldName);
            if (field == -1) {
                throw new IllegalArgumentException(String.format("Field %s can not be found in table schema", fieldName));
            }
            return fieldName;
        }

        private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(RowType rowType, List<String> primaryKeys, CoreOptions options) {
            List<String> fieldNames = rowType.getFieldNames();
            List<DataType> fieldTypes = rowType.getFieldTypes();
            HashMap<Integer, Supplier<FieldAggregator>> fieldAggregators = new HashMap<Integer, Supplier<FieldAggregator>>();
            String defaultAggFunc = options.fieldsDefaultFunc();
            for (int i = 0; i < fieldNames.size(); ++i) {
                String fieldName = fieldNames.get(i);
                DataType fieldType = fieldTypes.get(i);
                boolean isPrimaryKey = primaryKeys.contains(fieldName);
                String strAggFunc = options.fieldAggFunc(fieldName);
                boolean ignoreRetract = options.fieldAggIgnoreRetract(fieldName);
                if (strAggFunc != null) {
                    fieldAggregators.put(i, () -> FieldAggregator.createFieldAggregator(fieldType, strAggFunc, ignoreRetract, isPrimaryKey, options, fieldName));
                    continue;
                }
                if (defaultAggFunc == null) continue;
                fieldAggregators.put(i, () -> FieldAggregator.createFieldAggregator(fieldType, defaultAggFunc, ignoreRetract, isPrimaryKey, options, fieldName));
            }
            return fieldAggregators;
        }
    }
}

