/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.api.pipeline.buffer;

import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipeBuffer;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.PipelineSliceListener;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.PipelineSliceReader;
import com.antgroup.geaflow.shuffle.memory.ShuffleDataManager;
import com.antgroup.geaflow.shuffle.message.SliceId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayDeque;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipelineSlice {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipelineSlice.class);
    private final SliceId sliceId;
    private final String taskLogTag;
    private final ArrayDeque<PipeBuffer> buffers;
    private PipelineSliceReader sliceReader;
    private int refCount;
    private volatile boolean isReleased;
    private boolean flushRequested;

    public PipelineSlice(String taskLogTag, SliceId sliceId) {
        this(taskLogTag, sliceId, 1);
    }

    public PipelineSlice(String taskLogTag, SliceId sliceId, int refCount) {
        this.sliceId = sliceId;
        this.taskLogTag = taskLogTag;
        this.refCount = refCount;
        this.buffers = new ArrayDeque();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean add(PipeBuffer recordBuffer) {
        boolean notifyDataAvailable;
        ArrayDeque<PipeBuffer> arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (this.isReleased) {
                return false;
            }
            this.buffers.add(recordBuffer);
            notifyDataAvailable = this.shouldNotifyDataAvailable();
        }
        if (notifyDataAvailable) {
            this.notifyDataAvailable(recordBuffer.getBatchId());
        }
        return true;
    }

    private boolean shouldNotifyDataAvailable() {
        return this.sliceReader != null && !this.flushRequested && this.getCurrentNumberOfBuffers() == 1;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flush() {
        boolean needNotify;
        long batchId;
        ArrayDeque<PipeBuffer> arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (this.buffers.isEmpty()) {
                return;
            }
            batchId = this.buffers.peekLast().getBatchId();
            needNotify = !this.flushRequested && this.buffers.size() == 1;
            this.updateFlushRequested(this.flushRequested || this.buffers.size() > 1 || needNotify);
        }
        if (needNotify) {
            this.notifyDataAvailable(batchId);
        }
    }

    private void notifyDataAvailable(long batchId) {
        PipelineSliceReader reader = this.sliceReader;
        if (reader != null) {
            reader.notifyAvailable(batchId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setRefCount(int refCount) {
        ArrayDeque<PipeBuffer> arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            this.refCount = refCount;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public PipelineSliceReader createSliceReader(long startBatchId, PipelineSliceListener listener) {
        ArrayDeque<PipeBuffer> arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (this.isReleased) {
                throw new GeaflowRuntimeException("slice is released:" + this.sliceId);
            }
            --this.refCount;
            this.sliceReader = new PipelineSliceReader(this, startBatchId, this.refCount < 1, listener);
            LOGGER.info("creating reader for {} {} with startBatch:{} refCount:{} disposable:{}", new Object[]{this.taskLogTag, this.sliceId, startBatchId, this.refCount, this.sliceReader.isDisposable()});
        }
        return this.sliceReader;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public PipeBuffer next() {
        ArrayDeque<PipeBuffer> arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            PipeBuffer buffer = null;
            if (!this.buffers.isEmpty()) {
                buffer = this.buffers.pop();
                if (this.buffers.size() == 0) {
                    this.updateFlushRequested(false);
                }
            }
            return buffer;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean hasNext() {
        ArrayDeque<PipeBuffer> arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            return this.flushRequested || this.getCurrentNumberOfBuffers() > 0;
        }
    }

    public int getCurrentNumberOfBuffers() {
        Preconditions.checkArgument((boolean)Thread.holdsLock(this.buffers), (Object)"fail to get lock of buffers");
        return this.buffers.size();
    }

    @VisibleForTesting
    public int getNumberOfBuffers() {
        return this.buffers.size();
    }

    private void updateFlushRequested(boolean flushRequested) {
        Preconditions.checkArgument((boolean)Thread.holdsLock(this.buffers), (Object)"fail to get lock of buffers");
        this.flushRequested = flushRequested;
    }

    public boolean disposeIfNotNeed() {
        return this.refCount == 0 && !this.hasNext();
    }

    public Iterator<PipeBuffer> getBufferIterator() {
        return this.buffers.iterator();
    }

    public SliceId getSliceId() {
        return this.sliceId;
    }

    @VisibleForTesting
    public PipelineSliceReader getSliceReader() {
        return this.sliceReader;
    }

    public boolean isReleased() {
        return this.isReleased;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void release() {
        PipelineSliceReader reader;
        int bufferSize;
        ArrayDeque<PipeBuffer> arrayDeque = this.buffers;
        synchronized (arrayDeque) {
            if (this.isReleased) {
                return;
            }
            bufferSize = this.buffers.size();
            this.buffers.clear();
            reader = this.sliceReader;
            this.sliceReader = null;
            this.isReleased = true;
        }
        LOGGER.info("{}: released {} with bufferSize:{}", new Object[]{this.taskLogTag, this.sliceId, bufferSize});
        if (reader != null) {
            reader.release();
        }
        ShuffleDataManager.getInstance().release(this.sliceId);
    }
}

