/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.api.pipeline.fetcher;

import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipeBuffer;
import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipeChannelBuffer;
import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipelineSlice;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.PipelineSliceListener;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;

public class PipelineSliceReader {
    private final PipelineSlice slice;
    private final PipelineSliceListener listener;
    private final AtomicBoolean released;
    private final boolean disposable;
    private final Iterator<PipeBuffer> sliceIterator;
    private int totalMessages;
    private int consumedMessages;
    private volatile long requestBatchId;
    private volatile long consumedBatchId;

    public PipelineSliceReader(PipelineSlice slice, long startBatchId, boolean disposable, PipelineSliceListener listener) {
        this.slice = slice;
        this.listener = listener;
        this.released = new AtomicBoolean();
        this.disposable = disposable;
        Iterator<PipeBuffer> iterator = this.sliceIterator = disposable ? null : slice.getBufferIterator();
        if (!disposable) {
            this.totalMessages = slice.getCurrentNumberOfBuffers();
            this.consumedMessages = 0;
        }
        this.consumedBatchId = -1L;
        this.requestBatchId = startBatchId;
    }

    public boolean hasNext() {
        if (this.isReleased()) {
            throw new IllegalStateException("slice has been released already: " + this.slice.getSliceId());
        }
        return this.hasBatch() && (this.disposable ? this.slice.hasNext() : this.consumedMessages < this.totalMessages);
    }

    public PipeChannelBuffer next() {
        if (!this.hasBatch()) {
            return null;
        }
        PipeBuffer record = null;
        if (this.sliceIterator != null && this.sliceIterator.hasNext()) {
            record = this.sliceIterator.next();
            if (record != null) {
                ++this.consumedMessages;
            }
        } else {
            record = this.slice.next();
        }
        if (record == null) {
            return null;
        }
        if (!record.isData()) {
            this.consumedBatchId = record.getBatchId();
        }
        return new PipeChannelBuffer(record, this.hasNext());
    }

    public void updateRequestedBatchId(long batchId) {
        this.requestBatchId = batchId;
    }

    public void notifyAvailable(long batchId) {
        if (this.requestBatchId == -1L || batchId <= this.requestBatchId) {
            this.listener.notifyDataAvailable();
        }
    }

    private boolean hasBatch() {
        return this.requestBatchId == -1L || this.consumedBatchId < this.requestBatchId;
    }

    public void release() {
        if (this.released.compareAndSet(false, true) && this.slice.disposeIfNotNeed()) {
            this.slice.release();
        }
    }

    public boolean isReleased() {
        return this.released.get() || this.slice.isReleased();
    }

    public boolean isDisposable() {
        return this.disposable;
    }
}

