/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.api.pipeline.channel;

import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipeChannelBuffer;
import com.antgroup.geaflow.shuffle.api.pipeline.channel.AbstractInputChannel;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.OneShardFetcher;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.PipelineSliceListener;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.PipelineSliceReader;
import com.antgroup.geaflow.shuffle.memory.ShuffleDataManager;
import com.antgroup.geaflow.shuffle.message.SliceId;
import com.antgroup.geaflow.shuffle.util.SliceNotFoundException;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalInputChannel
extends AbstractInputChannel
implements PipelineSliceListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(LocalInputChannel.class);
    private final Object requestLock = new Object();
    private PipelineSliceReader sliceReader;
    private volatile boolean isReleased;

    public LocalInputChannel(OneShardFetcher fetcher, SliceId inputSlice, int channelIndex, int initialBackoff, int maxBackoff, long startBatchId) {
        super(channelIndex, fetcher, inputSlice, initialBackoff, maxBackoff, startBatchId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestSlice(long batchId) throws IOException {
        boolean retriggerRequest = false;
        boolean notifyAvailable = false;
        Object object = this.requestLock;
        synchronized (object) {
            Preconditions.checkState((!this.isReleased ? 1 : 0) != 0, (Object)"LocalInputChannel has been released already");
            if (this.sliceReader == null) {
                LOGGER.info("Requesting Local slice {}", (Object)this.inputSliceId);
                try {
                    this.sliceReader = ShuffleDataManager.getInstance().createSliceReader(this.inputSliceId, this.initialBatchId, this);
                    notifyAvailable = true;
                }
                catch (SliceNotFoundException notFound) {
                    if (this.increaseBackoff()) {
                        retriggerRequest = true;
                    }
                    LOGGER.warn("not found slice:{}", (Object)this.inputSliceId);
                    throw notFound;
                }
            } else {
                this.sliceReader.updateRequestedBatchId(batchId);
                if (this.sliceReader.hasNext()) {
                    this.notifyDataAvailable();
                }
            }
        }
        if (notifyAvailable) {
            this.notifyDataAvailable();
        }
        if (retriggerRequest) {
            this.inputFetcher.retriggerFetchRequest(this.inputSliceId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void retriggerSliceRequest(Timer timer) {
        Object object = this.requestLock;
        synchronized (object) {
            Preconditions.checkState((this.sliceReader == null ? 1 : 0) != 0, (Object)"already requested slice");
            timer.schedule(new TimerTask(){

                @Override
                public void run() {
                    try {
                        LocalInputChannel.this.requestSlice(LocalInputChannel.this.initialBatchId);
                    }
                    catch (Throwable t) {
                        LocalInputChannel.this.setError(t);
                    }
                }
            }, this.getCurrentBackoff());
        }
    }

    @Override
    public Optional<PipeChannelBuffer> getNext() throws IOException {
        PipeChannelBuffer next;
        this.checkError();
        PipelineSliceReader reader = this.sliceReader;
        if (reader == null) {
            if (this.isReleased) {
                return Optional.empty();
            }
            reader = this.checkAndGetSliceReader();
        }
        if ((next = reader.next()) == null) {
            return Optional.empty();
        }
        next.setSliceId(this.inputSliceId);
        return Optional.of(next);
    }

    @Override
    public void notifyDataAvailable() {
        this.notifyChannelNonEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PipelineSliceReader checkAndGetSliceReader() {
        Object object = this.requestLock;
        synchronized (object) {
            Preconditions.checkState((!this.isReleased ? 1 : 0) != 0, (Object)"released");
            Preconditions.checkState((this.sliceReader != null ? 1 : 0) != 0, (Object)"reader is not ready.");
            return this.sliceReader;
        }
    }

    @Override
    public boolean isReleased() {
        return this.isReleased;
    }

    @Override
    public void release() {
        if (!this.isReleased) {
            this.isReleased = true;
            PipelineSliceReader reader = this.sliceReader;
            if (reader != null) {
                reader.release();
                this.sliceReader = null;
            }
        }
    }

    public String toString() {
        return "LocalInputChannel [" + this.inputSliceId + "]";
    }
}

