/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.api.pipeline.channel;

import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipeBuffer;
import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipeChannelBuffer;
import com.antgroup.geaflow.shuffle.api.pipeline.channel.AbstractInputChannel;
import com.antgroup.geaflow.shuffle.api.pipeline.channel.ChannelId;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.OneShardFetcher;
import com.antgroup.geaflow.shuffle.message.SliceId;
import com.antgroup.geaflow.shuffle.network.ConnectionId;
import com.antgroup.geaflow.shuffle.network.IConnectionManager;
import com.antgroup.geaflow.shuffle.network.netty.ConnectionManager;
import com.antgroup.geaflow.shuffle.network.netty.SliceRequestClient;
import com.antgroup.geaflow.shuffle.util.SliceNotFoundException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

public class RemoteInputChannel
extends AbstractInputChannel {
    private final ChannelId id = new ChannelId();
    private final ConnectionId connectionId;
    private final ConnectionManager connectionManager;
    private final ArrayDeque<PipeBuffer> receivedBuffers = new ArrayDeque();
    private final AtomicBoolean isReleased = new AtomicBoolean();
    private volatile SliceRequestClient sliceRequestClient;
    private int expectedSequenceNumber = 0;

    public RemoteInputChannel(OneShardFetcher fetcher, SliceId inputSlice, int channelIndex, ConnectionId connectionId, int initialBackoff, int maxBackoff, long startBatchId, IConnectionManager connectionManager) {
        super(channelIndex, fetcher, inputSlice, initialBackoff, maxBackoff, startBatchId);
        this.connectionId = (ConnectionId)Preconditions.checkNotNull((Object)connectionId);
        this.connectionManager = (ConnectionManager)connectionManager;
    }

    @Override
    public void requestSlice(long batchId) throws IOException, InterruptedException {
        if (this.sliceRequestClient == null) {
            this.sliceRequestClient = this.connectionManager.createSliceRequestClient(this.connectionId);
            this.sliceRequestClient.requestSlice(this.inputSliceId, this, 0, this.initialBatchId);
        } else {
            this.sliceRequestClient.requestNextBatch(batchId, this);
        }
    }

    public void retriggerSliceRequest(SliceId sliceId) throws IOException {
        this.checkClientInitialized();
        this.checkError();
        if (this.increaseBackoff()) {
            this.sliceRequestClient.requestSlice(sliceId, this, this.getCurrentBackoff(), this.initialBatchId);
        } else {
            this.setError(new SliceNotFoundException(sliceId));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Optional<PipeChannelBuffer> getNext() throws IOException {
        boolean moreAvailable;
        PipeBuffer next;
        this.checkClientInitialized();
        this.checkError();
        ArrayDeque<PipeBuffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            next = this.receivedBuffers.poll();
            moreAvailable = !this.receivedBuffers.isEmpty();
        }
        if (next == null) {
            if (this.isReleased.get()) {
                throw new IOException("Queried for a buffer after channel has been released.");
            }
            throw new IllegalStateException("There should always have queued buffers for unreleased channel.");
        }
        return Optional.of(new PipeChannelBuffer(next, moreAvailable, this.inputSliceId));
    }

    @Override
    public boolean isReleased() {
        return this.isReleased.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void release() throws IOException {
        if (this.isReleased.compareAndSet(false, true)) {
            ArrayDeque<PipeBuffer> arrayDeque = this.receivedBuffers;
            synchronized (arrayDeque) {
                this.receivedBuffers.clear();
            }
            if (this.sliceRequestClient != null) {
                this.sliceRequestClient.close(this);
            } else {
                this.connectionManager.closeOpenChannelConnections(this.connectionId);
            }
        }
    }

    public String toString() {
        return "RemoteInputChannel [" + this.inputSliceId + " at " + this.connectionId + "]";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfQueuedBuffers() {
        ArrayDeque<PipeBuffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            return this.receivedBuffers.size();
        }
    }

    public ChannelId getInputChannelId() {
        return this.id;
    }

    @VisibleForTesting
    public SliceRequestClient getSliceRequestClient() {
        return this.sliceRequestClient;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onBuffer(PipeBuffer buffer, int sequenceNumber) throws IOException {
        ArrayDeque<PipeBuffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            if (this.isReleased.get()) {
                return;
            }
            if (this.expectedSequenceNumber != sequenceNumber) {
                this.onError(new ReorderingException(this.expectedSequenceNumber, sequenceNumber));
                return;
            }
            boolean wasEmpty = this.receivedBuffers.isEmpty();
            this.receivedBuffers.add(buffer);
            ++this.expectedSequenceNumber;
            if (wasEmpty) {
                this.notifyChannelNonEmpty();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEmptyBuffer(int sequenceNumber) throws IOException {
        ArrayDeque<PipeBuffer> arrayDeque = this.receivedBuffers;
        synchronized (arrayDeque) {
            if (!this.isReleased.get()) {
                if (this.expectedSequenceNumber == sequenceNumber) {
                    ++this.expectedSequenceNumber;
                } else {
                    this.onError(new ReorderingException(this.expectedSequenceNumber, sequenceNumber));
                }
            }
        }
    }

    public void onFailedFetchRequest() {
        this.inputFetcher.retriggerFetchRequest(this);
    }

    public void onError(Throwable cause) {
        this.setError(cause);
    }

    private void checkClientInitialized() {
        Preconditions.checkState((this.sliceRequestClient != null ? 1 : 0) != 0, (Object)"Bug: client is not initialized before request data.");
    }

    private static class ReorderingException
    extends IOException {
        private static final long serialVersionUID = -888282210356266816L;
        private final int expectedSequenceNumber;
        private final int actualSequenceNumber;

        ReorderingException(int expectedSequenceNumber, int actualSequenceNumber) {
            this.expectedSequenceNumber = expectedSequenceNumber;
            this.actualSequenceNumber = actualSequenceNumber;
        }

        @Override
        public String getMessage() {
            return String.format("Buffer re-ordering: expected buffer with sequence number %d, but received %d.", this.expectedSequenceNumber, this.actualSequenceNumber);
        }
    }
}

