/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.common.source;

import com.alibaba.ververica.connectors.common.source.AbstractParallelSourceBase;
import java.io.Serializable;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractParallelSource<T, CURSOR extends Serializable>
extends AbstractParallelSourceBase<T, CURSOR>
implements ListCheckpointed<Tuple2<InputSplit, CURSOR>> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractParallelSource.class);
    private static final long serialVersionUID = -7848357196819780804L;

    public List<Tuple2<InputSplit, CURSOR>> snapshotState(long checkpointId, long timestamp) throws Exception {
        LinkedList<Tuple2<InputSplit, CURSOR>> state = new LinkedList<Tuple2<InputSplit, CURSOR>>();
        if (this.disableParallelRead) {
            return state;
        }
        HashSet<InputSplit> partitionWithState = new HashSet<InputSplit>();
        for (Map.Entry entry : this.parallelReader.getProgress().getProgress().entrySet()) {
            state.add(Tuple2.of((Object)entry.getKey(), (Object)((Serializable)entry.getValue())));
            partitionWithState.add(entry.getKey());
        }
        for (Tuple2 tuple2 : this.initialProgress) {
            if (partitionWithState.contains(tuple2.f0)) continue;
            state.add(tuple2);
        }
        return state;
    }

    public void restoreState(List<Tuple2<InputSplit, CURSOR>> state) throws Exception {
        LOG.info("Restoring state: {}", state);
        this.recoveryFromState = true;
        if (state != null && !state.isEmpty()) {
            this.initialProgress = state;
        }
    }
}

