/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.utils.SerializationUtils;

public class StoreSinkWriteState {
    private final StateValueFilter stateValueFilter;
    private final ListState<Tuple5<String, String, byte[], Integer, byte[]>> listState;
    private final Map<String, Map<String, List<StateValue>>> map;

    public StoreSinkWriteState(StateInitializationContext context, StateValueFilter stateValueFilter) throws Exception {
        this.stateValueFilter = stateValueFilter;
        TupleSerializer listStateSerializer = new TupleSerializer(Tuple5.class, new TypeSerializer[]{StringSerializer.INSTANCE, StringSerializer.INSTANCE, BytePrimitiveArraySerializer.INSTANCE, IntSerializer.INSTANCE, BytePrimitiveArraySerializer.INSTANCE});
        this.listState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor("paimon_store_sink_write_state", (TypeSerializer)listStateSerializer));
        this.map = new HashMap<String, Map<String, List<StateValue>>>();
        for (Tuple5 tuple : (Iterable)this.listState.get()) {
            BinaryRow partition = SerializationUtils.deserializeBinaryRow((byte[])tuple.f2);
            if (!stateValueFilter.filter((String)tuple.f0, partition, (Integer)tuple.f3)) continue;
            this.map.computeIfAbsent((String)tuple.f0, k -> new HashMap()).computeIfAbsent(tuple.f1, k -> new ArrayList()).add(new StateValue(partition, (Integer)tuple.f3, (byte[])tuple.f4));
        }
    }

    public StateValueFilter stateValueFilter() {
        return this.stateValueFilter;
    }

    @Nullable
    public List<StateValue> get(String tableName, String key) {
        Map<String, List<StateValue>> innerMap = this.map.get(tableName);
        return innerMap == null ? null : innerMap.get(key);
    }

    public void put(String tableName, String key, List<StateValue> stateValues) {
        this.map.computeIfAbsent(tableName, k -> new HashMap()).put(key, stateValues);
    }

    public void snapshotState() throws Exception {
        ArrayList<Tuple5> list = new ArrayList<Tuple5>();
        for (Map.Entry<String, Map<String, List<StateValue>>> tables : this.map.entrySet()) {
            for (Map.Entry<String, List<StateValue>> entry : tables.getValue().entrySet()) {
                for (StateValue stateValue : entry.getValue()) {
                    list.add(Tuple5.of((Object)tables.getKey(), (Object)entry.getKey(), (Object)SerializationUtils.serializeBinaryRow(stateValue.partition()), (Object)stateValue.bucket(), (Object)stateValue.value()));
                }
            }
        }
        this.listState.update(list);
    }

    public static interface StateValueFilter {
        public boolean filter(String var1, BinaryRow var2, int var3);
    }

    public static class StateValue {
        private final BinaryRow partition;
        private final int bucket;
        private final byte[] value;

        public StateValue(BinaryRow partition, int bucket, byte[] value) {
            this.partition = partition;
            this.bucket = bucket;
            this.value = value;
        }

        public BinaryRow partition() {
            return this.partition;
        }

        public int bucket() {
            return this.bucket;
        }

        public byte[] value() {
            return this.value;
        }
    }
}

