/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.state.ListState;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.utils.Preconditions;

public class SplitListState<T>
implements ListState<T> {
    private final String splitPrefix;
    private final List<T> values;
    private final Function<T, String> serializer;
    private final Function<String, T> deserializer;

    public SplitListState(String identifier, Function<T, String> serializer, Function<String, T> deserializer) {
        Preconditions.checkArgument(!Character.isDigit(identifier.charAt(0)), String.format("Identifier %s should not start with digits.", identifier));
        this.splitPrefix = identifier.length() + identifier;
        this.serializer = serializer;
        this.deserializer = deserializer;
        this.values = new ArrayList<T>();
    }

    public void add(T value) {
        this.values.add(value);
    }

    public List<T> get() {
        return new ArrayList<T>(this.values);
    }

    public void update(List<T> values2) {
        this.values.clear();
        this.values.addAll(values2);
    }

    public void addAll(List<T> values2) throws Exception {
        this.values.addAll(values2);
    }

    public void clear() {
        this.values.clear();
    }

    public List<SimpleSourceSplit> snapshotState() {
        return this.values.stream().map(x -> new SimpleSourceSplit(this.splitPrefix + this.serializer.apply(x))).collect(Collectors.toList());
    }

    public void restoreState(List<SimpleSourceSplit> splits) {
        this.values.clear();
        splits.stream().map(SimpleSourceSplit::value).filter(x -> x.startsWith(this.splitPrefix)).map(x -> x.substring(this.splitPrefix.length())).map(this.deserializer).forEach(this.values::add);
    }
}

