/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.api.pipeline.fetcher;

import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.shuffle.ShuffleAddress;
import com.antgroup.geaflow.common.tuple.Tuple;
import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipeChannelBuffer;
import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipeFetcherBuffer;
import com.antgroup.geaflow.shuffle.api.pipeline.channel.AbstractInputChannel;
import com.antgroup.geaflow.shuffle.api.pipeline.channel.LocalInputChannel;
import com.antgroup.geaflow.shuffle.api.pipeline.channel.RemoteInputChannel;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.ShardFetcher;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.ShardFetcherListener;
import com.antgroup.geaflow.shuffle.config.ShuffleConfig;
import com.antgroup.geaflow.shuffle.message.ISliceMeta;
import com.antgroup.geaflow.shuffle.message.PipelineSliceMeta;
import com.antgroup.geaflow.shuffle.message.SliceId;
import com.antgroup.geaflow.shuffle.network.ConnectionId;
import com.antgroup.geaflow.shuffle.network.IConnectionManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SequencedCollection;
import java.util.Timer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OneShardFetcher
implements ShardFetcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(OneShardFetcher.class);
    private static final int DEFAULT_CONNECTION_ID = -1;
    private static final String DEFAULT_STREAM_NAME = "";
    private final int stageId;
    private final String taskName;
    private final int fetcherIndex;
    private final Object requestLock = new Object();
    private final List<ShardFetcherListener> fetcherListeners = new ArrayList<ShardFetcherListener>();
    private final BitSet enqueuedInputChannelsWithData;
    private final ArrayDeque<AbstractInputChannel> inputChannelsWithData = new ArrayDeque();
    private AbstractInputChannel nextInputChannel;
    private volatile boolean isReleased;
    private volatile boolean isRunning;
    private Timer retriggerLocalRequestTimer;
    private final ExecutorService retriggerRemoteExecutor;
    private final AtomicReference<Throwable> cause = new AtomicReference();
    protected final int numberOfInputChannels;
    protected final Map<SliceId, AbstractInputChannel> inputChannels;
    protected final IConnectionManager connectionManager;
    protected final String inputStream;

    public OneShardFetcher(int stageId, String taskName, int fetcherIndex, int connectionId, String inputStream, List<? extends ISliceMeta> inputSlices, long startBatchId, IConnectionManager connectionManager) {
        this.stageId = stageId;
        this.inputStream = inputStream;
        this.taskName = (String)Preconditions.checkNotNull((Object)taskName);
        this.fetcherIndex = fetcherIndex;
        this.numberOfInputChannels = inputSlices.size();
        this.inputChannels = new HashMap<SliceId, AbstractInputChannel>(this.numberOfInputChannels);
        this.enqueuedInputChannelsWithData = new BitSet(this.numberOfInputChannels);
        this.connectionManager = connectionManager;
        this.retriggerRemoteExecutor = connectionManager.getExecutor();
        this.isReleased = false;
        this.isRunning = true;
        ShuffleConfig nettyConfig = connectionManager.getShuffleConfig();
        int initialBackoff = nettyConfig.getConnectInitialBackoffMs();
        int maxBackoff = nettyConfig.getConnectMaxBackoffMs();
        this.buildInputChannels(connectionId, inputSlices, initialBackoff, maxBackoff, startBatchId);
    }

    public OneShardFetcher(int stageId, String taskName, int fetcherIndex, String inputStream, List<? extends ISliceMeta> inputSlices, long startBatchId, IConnectionManager connectionManager) {
        this(stageId, taskName, fetcherIndex, -1, inputStream, inputSlices, startBatchId, connectionManager);
    }

    @VisibleForTesting
    public OneShardFetcher(int stageId, String taskName, int fetcherIndex, List<? extends ISliceMeta> inputSlices, long startBatchId, IConnectionManager connectionManager) {
        this(stageId, taskName, fetcherIndex, -1, DEFAULT_STREAM_NAME, inputSlices, startBatchId, connectionManager);
    }

    protected void buildInputChannels(int connectionId, List<? extends ISliceMeta> inputSlices, int initialBackoff, int maxBackoff, long initialBatchId) {
        int localChannels = 0;
        ShuffleAddress localAddr = this.connectionManager.getShuffleAddress();
        for (int inputChannelIdx = 0; inputChannelIdx < this.numberOfInputChannels; ++inputChannelIdx) {
            AbstractInputChannel inputChannel;
            PipelineSliceMeta task = (PipelineSliceMeta)inputSlices.get(inputChannelIdx);
            ShuffleAddress address = task.getShuffleAddress();
            SliceId inputSlice = task.getSliceId();
            if (address.equals((Object)localAddr)) {
                inputChannel = new LocalInputChannel(this, inputSlice, inputChannelIdx, initialBackoff, maxBackoff, initialBatchId);
                this.inputChannels.put(inputSlice, inputChannel);
                ++localChannels;
                continue;
            }
            inputChannel = new RemoteInputChannel(this, inputSlice, inputChannelIdx, new ConnectionId(address, connectionId), initialBackoff, maxBackoff, initialBatchId, this.connectionManager);
            this.inputChannels.put(inputSlice, inputChannel);
        }
        LOGGER.info("{} create {} local channels in {} channels", new Object[]{this.taskName, localChannels, this.numberOfInputChannels});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestSlices(long batchId) {
        Object object = this.requestLock;
        synchronized (object) {
            if (this.isReleased) {
                throw new IllegalStateException("Already released.");
            }
            if (this.numberOfInputChannels != this.inputChannels.size()) {
                throw new IllegalStateException(String.format("Bug in input fetcher setup logic: mismatch between number of total input channels [%s] and the currently set number of input channels [%s].", this.inputChannels.size(), this.numberOfInputChannels));
            }
            this.internalRequestSlices(batchId);
        }
    }

    private void internalRequestSlices(long batchId) {
        for (AbstractInputChannel inputChannel : this.inputChannels.values()) {
            try {
                inputChannel.requestSlice(batchId);
            }
            catch (Throwable t) {
                inputChannel.setError(t);
                return;
            }
        }
        LOGGER.info("{} request next batch:{}", (Object)this.taskName, (Object)batchId);
    }

    public void retriggerFetchRequest(RemoteInputChannel inputChannel) {
        this.checkError();
        this.retriggerRemoteExecutor.execute(() -> {
            try {
                this.retriggerFetchRequest(inputChannel.getInputSliceId());
            }
            catch (Throwable e) {
                this.cause.set(e);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void retriggerFetchRequest(SliceId sliceId) throws IOException {
        Object object = this.requestLock;
        synchronized (object) {
            if (!this.isReleased) {
                AbstractInputChannel ch = this.inputChannels.get(sliceId);
                if (ch.getClass() == RemoteInputChannel.class) {
                    RemoteInputChannel rch = (RemoteInputChannel)ch;
                    rch.retriggerSliceRequest(sliceId);
                } else {
                    LocalInputChannel ich = (LocalInputChannel)ch;
                    if (this.retriggerLocalRequestTimer == null) {
                        this.retriggerLocalRequestTimer = new Timer(true);
                    }
                    ich.retriggerSliceRequest(this.retriggerLocalRequestTimer);
                }
            }
        }
    }

    private void checkError() {
        Throwable t = this.cause.get();
        if (t != null) {
            throw new GeaflowRuntimeException(t.getMessage(), t);
        }
    }

    @Override
    public Optional<PipeFetcherBuffer> getNext() throws IOException, InterruptedException {
        return this.getNext(true);
    }

    @Override
    public Optional<PipeFetcherBuffer> pollNext() throws IOException, InterruptedException {
        return this.getNext(false);
    }

    private Optional<PipeFetcherBuffer> getNext(boolean blocking) throws IOException, InterruptedException {
        if (!this.isRunning) {
            return Optional.empty();
        }
        if (this.isReleased) {
            throw new IOException("Input fetcher is already closed.");
        }
        this.checkError();
        Optional<ShardFetcher.InputWithData<AbstractInputChannel, PipeChannelBuffer>> next = this.getNextInputData(blocking);
        if (!next.isPresent()) {
            return Optional.empty();
        }
        ShardFetcher.InputWithData<AbstractInputChannel, PipeChannelBuffer> inputWithData = next.get();
        PipeFetcherBuffer fetcherBuffer = new PipeFetcherBuffer(((PipeChannelBuffer)inputWithData.data).getBuffer(), ((AbstractInputChannel)inputWithData.input).getChannelIndex(), inputWithData.moreAvailable, ((PipeChannelBuffer)inputWithData.data).getSliceId(), this.inputStream);
        return Optional.of(fetcherBuffer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<ShardFetcher.InputWithData<AbstractInputChannel, PipeChannelBuffer>> getNextInputData(boolean blocking) throws IOException, InterruptedException {
        AbstractInputChannel currentChannel;
        boolean moreAvailable = false;
        Optional<Object> result = Optional.empty();
        do {
            if (this.nextInputChannel != null) {
                currentChannel = this.nextInputChannel;
                ArrayDeque<AbstractInputChannel> arrayDeque = this.inputChannelsWithData;
                synchronized (arrayDeque) {
                    moreAvailable = this.inputChannelsWithData.size() > 0;
                }
            } else {
                Optional<Tuple<AbstractInputChannel, Boolean>> inputChannel = this.getChannel(blocking);
                if (!inputChannel.isPresent()) {
                    return Optional.empty();
                }
                currentChannel = (AbstractInputChannel)inputChannel.get().f0;
                if (currentChannel.isReleased()) continue;
                moreAvailable = (Boolean)inputChannel.get().f1;
            }
            result = currentChannel.getNext();
        } while (!result.isPresent());
        if (((PipeChannelBuffer)result.get()).moreAvailable()) {
            moreAvailable = true;
            if (((PipeChannelBuffer)result.get()).getBuffer().isData()) {
                this.nextInputChannel = currentChannel;
            } else {
                this.queueChannel(currentChannel);
                this.nextInputChannel = null;
            }
        } else {
            this.nextInputChannel = null;
        }
        return Optional.of(new ShardFetcher.InputWithData<AbstractInputChannel, Object>(currentChannel, result.get(), moreAvailable));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerListener(ShardFetcherListener inputFetcherListener) {
        List<ShardFetcherListener> list = this.fetcherListeners;
        synchronized (list) {
            this.fetcherListeners.add(inputFetcherListener);
        }
    }

    public void notifyChannelNonEmpty(AbstractInputChannel channel) {
        this.queueChannel((AbstractInputChannel)Preconditions.checkNotNull((Object)channel));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void queueChannel(AbstractInputChannel channel) {
        int availableChannels;
        SequencedCollection<Object> sequencedCollection = this.inputChannelsWithData;
        synchronized (sequencedCollection) {
            if (this.enqueuedInputChannelsWithData.get(channel.getChannelIndex())) {
                return;
            }
            availableChannels = this.inputChannelsWithData.size();
            this.inputChannelsWithData.add(channel);
            this.enqueuedInputChannelsWithData.set(channel.getChannelIndex());
            if (availableChannels == 0) {
                this.inputChannelsWithData.notifyAll();
            }
        }
        if (availableChannels == 0) {
            sequencedCollection = this.fetcherListeners;
            synchronized (sequencedCollection) {
                for (ShardFetcherListener listener : this.fetcherListeners) {
                    listener.notifyAvailable(this);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<Tuple<AbstractInputChannel, Boolean>> getChannel(boolean blocking) throws InterruptedException {
        if (this.nextInputChannel != null) {
            return Optional.of(Tuple.of((Object)this.nextInputChannel, (Object)true));
        }
        ArrayDeque<AbstractInputChannel> arrayDeque = this.inputChannelsWithData;
        synchronized (arrayDeque) {
            while (this.inputChannelsWithData.size() == 0) {
                if (!this.isRunning) {
                    return Optional.empty();
                }
                if (this.isReleased) {
                    throw new IllegalStateException("Channel released");
                }
                if (blocking) {
                    this.inputChannelsWithData.wait();
                    continue;
                }
                return Optional.empty();
            }
            AbstractInputChannel inputChannel = this.inputChannelsWithData.remove();
            this.enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex());
            int availableChannels = this.inputChannelsWithData.size();
            return Optional.of(Tuple.of((Object)inputChannel, (Object)(availableChannels > 0 ? 1 : 0)));
        }
    }

    @Override
    public int getNumberOfInputChannels() {
        return this.numberOfInputChannels;
    }

    public Map<SliceId, AbstractInputChannel> getInputChannels() {
        return this.inputChannels;
    }

    public int getFetcherIndex() {
        return this.fetcherIndex;
    }

    public int getStageId() {
        return this.stageId;
    }

    @Override
    public int getNumberOfQueuedBuffers() {
        for (int retry = 0; retry < 3; ++retry) {
            try {
                int totalBuffers = 0;
                for (AbstractInputChannel channel : this.inputChannels.values()) {
                    if (!(channel instanceof RemoteInputChannel)) continue;
                    totalBuffers += ((RemoteInputChannel)channel).getNumberOfQueuedBuffers();
                }
                return totalBuffers;
            }
            catch (Exception exception) {
                continue;
            }
        }
        return 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isFinished() {
        Object object = this.requestLock;
        synchronized (object) {
            for (AbstractInputChannel inputChannel : this.inputChannels.values()) {
                if (inputChannel.isReleased()) continue;
                return false;
            }
        }
        return true;
    }

    /*
     * Exception decompiling
     */
    @Override
    public void close() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }
}

