/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;

import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.io.TunnelRecordReader;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.source.MaxcomputeSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MaxcomputeSourceReader
implements SourceReader<SeaTunnelRow, MaxcomputeSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(MaxcomputeSourceReader.class);
    private final SourceReader.Context context;
    private final Set<MaxcomputeSourceSplit> sourceSplits;
    private Config pluginConfig;
    boolean noMoreSplit;
    private SeaTunnelRowType seaTunnelRowType;

    public MaxcomputeSourceReader(Config pluginConfig, SourceReader.Context context, SeaTunnelRowType seaTunnelRowType) {
        this.pluginConfig = pluginConfig;
        this.context = context;
        this.sourceSplits = new HashSet<MaxcomputeSourceSplit>();
        this.seaTunnelRowType = seaTunnelRowType;
    }

    public void open() {
    }

    public void close() {
    }

    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        this.sourceSplits.forEach(source -> {
            try {
                Record record;
                TableTunnel.DownloadSession session = MaxcomputeUtil.getDownloadSession(this.pluginConfig);
                TunnelRecordReader recordReader = session.openRecordReader(source.getSplitId(), source.getRowNum());
                log.info("open record reader success");
                while ((record = recordReader.read()) != null) {
                    SeaTunnelRow seaTunnelRow = MaxcomputeTypeMapper.getSeaTunnelRowData(record, this.seaTunnelRowType);
                    output.collect((Object)seaTunnelRow);
                }
                recordReader.close();
            }
            catch (Exception e) {
                throw new MaxcomputeConnectorException((SeaTunnelErrorCode)CommonErrorCode.READER_OPERATION_FAILED, (Throwable)e);
            }
        });
        if (this.noMoreSplit && Boundedness.BOUNDED.equals((Object)this.context.getBoundedness())) {
            log.info("Closed the bounded Maxcompute source");
            this.context.signalNoMoreElement();
        }
    }

    public List<MaxcomputeSourceSplit> snapshotState(long checkpointId) throws Exception {
        return new ArrayList<MaxcomputeSourceSplit>(this.sourceSplits);
    }

    public void addSplits(List<MaxcomputeSourceSplit> splits) {
        this.sourceSplits.addAll(splits);
    }

    public void handleNoMoreSplits() {
        this.noMoreSplit = true;
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }
}

