/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.api.pipeline.fetcher;

import com.antgroup.geaflow.common.tuple.Tuple;
import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipeFetcherBuffer;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.OneShardFetcher;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.ShardFetcher;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.ShardFetcherListener;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SequencedCollection;
import java.util.Set;
import java.util.stream.Collectors;

public class MultiShardFetcher
implements ShardFetcher,
ShardFetcherListener {
    private final ShardFetcher[] shardFetchers;
    private final Set<ShardFetcher> inputFetchersWithRemainingData;
    private final ArrayDeque<ShardFetcher> inputFetchersWithData = new ArrayDeque();
    private final Set<ShardFetcher> enqueuedInputFetchersWithData = new HashSet<ShardFetcher>();
    private final List<ShardFetcherListener> fetcherListeners = new ArrayList<ShardFetcherListener>();
    private final Map<ShardFetcher, Integer> inputFetcherToIndexOffsetMap;
    private final int totalNumberOfInputChannels;

    public MultiShardFetcher(OneShardFetcher ... inputFetchers) {
        this.shardFetchers = inputFetchers;
        Preconditions.checkArgument((inputFetchers.length > 1 ? 1 : 0) != 0, (Object)"Union input fetcher should union at least two input fetchers.");
        if (Arrays.stream(inputFetchers).map(OneShardFetcher::getFetcherIndex).distinct().count() != (long)inputFetchers.length) {
            throw new IllegalArgumentException("Union of two input fetchers with the same index. Given indices: " + Arrays.stream(inputFetchers).map(OneShardFetcher::getFetcherIndex).collect(Collectors.toList()));
        }
        this.inputFetcherToIndexOffsetMap = Maps.newHashMapWithExpectedSize((int)inputFetchers.length);
        this.inputFetchersWithRemainingData = Sets.newHashSetWithExpectedSize((int)inputFetchers.length);
        int currentNumberOfInputChannels = 0;
        for (OneShardFetcher fetcher : inputFetchers) {
            this.inputFetcherToIndexOffsetMap.put((ShardFetcher)Preconditions.checkNotNull((Object)fetcher), currentNumberOfInputChannels);
            this.inputFetchersWithRemainingData.add(fetcher);
            currentNumberOfInputChannels += fetcher.getNumberOfInputChannels();
            fetcher.registerListener(this);
        }
        this.totalNumberOfInputChannels = currentNumberOfInputChannels;
    }

    @Override
    public Optional<PipeFetcherBuffer> getNext() throws IOException, InterruptedException {
        return this.getNext(true);
    }

    @Override
    public Optional<PipeFetcherBuffer> pollNext() throws IOException, InterruptedException {
        return this.getNext(false);
    }

    private Optional<PipeFetcherBuffer> getNext(boolean blocking) throws IOException, InterruptedException {
        if (this.inputFetchersWithRemainingData.isEmpty()) {
            return Optional.empty();
        }
        Optional<ShardFetcher.InputWithData<ShardFetcher, PipeFetcherBuffer>> next = this.getNextInputData(blocking);
        if (!next.isPresent()) {
            return Optional.empty();
        }
        ShardFetcher.InputWithData<ShardFetcher, PipeFetcherBuffer> inputWithData = next.get();
        ShardFetcher shardFetcher = (ShardFetcher)inputWithData.input;
        PipeFetcherBuffer resultBuffer = (PipeFetcherBuffer)inputWithData.data;
        if (resultBuffer.moreAvailable()) {
            this.queueFetcher(shardFetcher);
        }
        int channelIndexOffset = this.inputFetcherToIndexOffsetMap.get(shardFetcher);
        resultBuffer.setChannelIndex(channelIndexOffset + resultBuffer.getChannelIndex());
        resultBuffer.setMoreAvailable(resultBuffer.moreAvailable() || inputWithData.moreAvailable);
        return Optional.of(inputWithData.data);
    }

    private Optional<ShardFetcher.InputWithData<ShardFetcher, PipeFetcherBuffer>> getNextInputData(boolean blocking) throws IOException, InterruptedException {
        boolean moreInputFetchersAvailable;
        ShardFetcher shardFetcher;
        Optional<PipeFetcherBuffer> result;
        do {
            Optional<Tuple<ShardFetcher, Boolean>> fetcherOptional;
            if (!(fetcherOptional = this.getInputFetcher(blocking)).isPresent()) {
                return Optional.empty();
            }
            shardFetcher = (ShardFetcher)fetcherOptional.get().f0;
            moreInputFetchersAvailable = (Boolean)fetcherOptional.get().f1;
        } while (!(result = shardFetcher.pollNext()).isPresent());
        return Optional.of(new ShardFetcher.InputWithData<ShardFetcher, PipeFetcherBuffer>(shardFetcher, result.get(), moreInputFetchersAvailable));
    }

    @Override
    public void requestSlices(long batchId) throws IOException {
        for (ShardFetcher fetcher : this.shardFetchers) {
            fetcher.requestSlices(batchId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerListener(ShardFetcherListener listener) {
        List<ShardFetcherListener> list = this.fetcherListeners;
        synchronized (list) {
            this.fetcherListeners.add(listener);
        }
    }

    @Override
    public void notifyAvailable(ShardFetcher shardFetcher) {
        this.queueFetcher(shardFetcher);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void queueFetcher(ShardFetcher shardFetcher) {
        int availableInputFetchers;
        Preconditions.checkNotNull((Object)shardFetcher);
        SequencedCollection<Object> sequencedCollection = this.inputFetchersWithData;
        synchronized (sequencedCollection) {
            if (this.enqueuedInputFetchersWithData.contains(shardFetcher)) {
                return;
            }
            availableInputFetchers = this.inputFetchersWithData.size();
            this.inputFetchersWithData.add(shardFetcher);
            this.enqueuedInputFetchersWithData.add(shardFetcher);
            if (availableInputFetchers == 0) {
                this.inputFetchersWithData.notifyAll();
            }
        }
        if (availableInputFetchers == 0) {
            sequencedCollection = this.fetcherListeners;
            synchronized (sequencedCollection) {
                for (ShardFetcherListener listener : this.fetcherListeners) {
                    listener.notifyAvailable(this);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<Tuple<ShardFetcher, Boolean>> getInputFetcher(boolean blocking) throws InterruptedException {
        ArrayDeque<ShardFetcher> arrayDeque = this.inputFetchersWithData;
        synchronized (arrayDeque) {
            while (this.inputFetchersWithData.size() == 0) {
                if (blocking) {
                    this.inputFetchersWithData.wait();
                    continue;
                }
                return Optional.empty();
            }
            ShardFetcher shardFetcher = this.inputFetchersWithData.remove();
            this.enqueuedInputFetchersWithData.remove(shardFetcher);
            boolean moreAvailable = !this.enqueuedInputFetchersWithData.isEmpty();
            return Optional.of(Tuple.of((Object)shardFetcher, (Object)moreAvailable));
        }
    }

    @Override
    public int getNumberOfInputChannels() {
        return this.totalNumberOfInputChannels;
    }

    @Override
    public int getNumberOfQueuedBuffers() {
        int totalBuffers = 0;
        for (ShardFetcher fetcher : this.shardFetchers) {
            totalBuffers += fetcher.getNumberOfQueuedBuffers();
        }
        return totalBuffers;
    }

    @VisibleForTesting
    public ShardFetcher[] getShardFetchers() {
        return this.shardFetchers;
    }

    @Override
    public boolean isFinished() {
        for (ShardFetcher fetcher : this.shardFetchers) {
            if (fetcher.isFinished()) continue;
            return false;
        }
        return true;
    }

    @Override
    public void close() {
        for (ShardFetcher fetcher : this.shardFetchers) {
            fetcher.close();
        }
    }
}

