/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.api.pipeline.fetcher;

import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipeChannelBuffer;
import com.antgroup.geaflow.shuffle.api.pipeline.channel.ChannelId;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.PipelineSliceListener;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.PipelineSliceReader;
import com.antgroup.geaflow.shuffle.memory.ShuffleDataManager;
import com.antgroup.geaflow.shuffle.message.SliceId;
import com.antgroup.geaflow.shuffle.network.netty.SliceOutputChannelHandler;
import java.io.IOException;

public class SequenceSliceReader
implements PipelineSliceListener {
    private final ChannelId inputChannelId;
    private final SliceOutputChannelHandler requestHandler;
    private SliceId sliceId;
    private PipelineSliceReader sliceReader;
    private int sequenceNumber = -1;
    private volatile boolean isRegistered = false;
    private volatile boolean isReleased = false;

    public SequenceSliceReader(ChannelId inputChannelId, SliceOutputChannelHandler requestHandler) {
        this.inputChannelId = inputChannelId;
        this.requestHandler = requestHandler;
    }

    public void createSliceReader(SliceId sliceId, long startBatchId) throws IOException {
        this.sliceId = sliceId;
        this.sliceReader = ShuffleDataManager.getInstance().createSliceReader(sliceId, startBatchId, this);
        this.notifyDataAvailable();
    }

    @Override
    public void notifyDataAvailable() {
        this.requestHandler.notifyNonEmpty(this);
    }

    public void requestBatch(long batchId) {
        this.sliceReader.updateRequestedBatchId(batchId);
    }

    public boolean hasNext() {
        return this.sliceReader != null && this.sliceReader.hasNext();
    }

    public PipeChannelBuffer next() {
        if (this.isReleased) {
            throw new IllegalArgumentException("slice has been released already: " + this.sliceId);
        }
        PipeChannelBuffer next = this.sliceReader.next();
        if (next != null) {
            ++this.sequenceNumber;
            return next;
        }
        return null;
    }

    public int getSequenceNumber() {
        return this.sequenceNumber;
    }

    public ChannelId getReceiverId() {
        return this.inputChannelId;
    }

    public void setRegistered(boolean registered) {
        this.isRegistered = registered;
    }

    public boolean isRegistered() {
        return this.isRegistered;
    }

    public void releaseAllResources() throws IOException {
        if (!this.isReleased) {
            this.isReleased = true;
            PipelineSliceReader reader = this.sliceReader;
            if (reader != null) {
                reader.release();
                this.sliceReader = null;
            }
        }
    }

    public String toString() {
        return "SequenceSliceReader{sliceId=" + this.sliceId + '}';
    }
}

