/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.api.reader;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.metric.ShuffleReadMetrics;
import com.antgroup.geaflow.shuffle.api.reader.AbstractFetcher;
import com.antgroup.geaflow.shuffle.api.reader.FetchContext;
import com.antgroup.geaflow.shuffle.api.reader.IShuffleFetcher;
import com.antgroup.geaflow.shuffle.api.reader.PipelineFetcher;
import com.antgroup.geaflow.shuffle.api.reader.ShuffleFetcherFactory;
import com.antgroup.geaflow.shuffle.message.BaseSliceMeta;
import com.antgroup.geaflow.shuffle.message.ISliceMeta;
import com.antgroup.geaflow.shuffle.message.PipelineBarrier;
import com.antgroup.geaflow.shuffle.message.PipelineEvent;
import com.antgroup.geaflow.shuffle.network.IConnectionManager;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HybridFetcher
extends AbstractFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(HybridFetcher.class);
    protected PipelineFetcher pipelineFetcher;
    protected List<IShuffleFetcher> fetcherList;
    protected List<Map<Integer, List<ISliceMeta>>> sliceMapList;
    private Queue<PipelineBarrier> barrierQueue;
    private int curInputIndex;
    private IShuffleFetcher curFetcher;
    private PipelineEvent curValue;
    private FetchContext fetchContext;

    @Override
    public void setup(IConnectionManager connectionManager, Configuration config) {
        super.setup(connectionManager, config);
        this.pipelineFetcher = ShuffleFetcherFactory.getPipelineFetcher(connectionManager, config);
        this.sliceMapList = new ArrayList<Map<Integer, List<ISliceMeta>>>();
        this.fetcherList = new ArrayList<IShuffleFetcher>();
        this.barrierQueue = new LinkedList<PipelineBarrier>();
    }

    @Override
    public void init(FetchContext fetchContext) {
        super.init(fetchContext);
        this.sliceMapList.clear();
        this.fetcherList.clear();
        this.fetchContext = fetchContext;
        this.curFetcher = null;
        Map<Integer, List<ISliceMeta>> sliceMap = fetchContext.getRequest().getInputSlices();
        if (sliceMap.isEmpty()) {
            this.readMetrics.setFetchSlices(0);
            return;
        }
        this.splitRemoteSlices(sliceMap);
        this.readMetrics.setFetchSlices(this.totalSliceNum);
        this.curInputIndex = 0;
        if (this.sliceMapList.size() > 0) {
            this.initNextFetcher();
        }
    }

    private void initNextFetcher() {
        Map sliceMetas = this.sliceMapList.get(this.curInputIndex);
        this.curFetcher = this.fetcherList.get(this.curInputIndex);
        this.fetchContext.setInputSliceMap(sliceMetas);
        this.curFetcher.init(this.fetchContext);
    }

    @Override
    public boolean hasNext() {
        if (this.curValue != null) {
            return true;
        }
        if (this.curFetcher != null) {
            boolean hasNext = this.curFetcher.hasNext();
            while (!hasNext && this.curInputIndex < this.sliceMapList.size() - 1) {
                ++this.curInputIndex;
                this.initNextFetcher();
                hasNext = this.curFetcher.hasNext();
            }
            this.curValue = hasNext ? this.curFetcher.next() : (PipelineEvent)this.barrierQueue.poll();
        } else {
            this.curValue = this.barrierQueue.poll();
        }
        return this.curValue != null;
    }

    @Override
    public PipelineEvent next() {
        ++this.processedNum;
        PipelineEvent result = this.curValue;
        this.curValue = null;
        return result;
    }

    @Override
    public ShuffleReadMetrics getReadMetrics() {
        if (this.readMetrics != null) {
            this.readMetrics.merge(this.pipelineFetcher.getReadMetrics());
            this.readMetrics.setFetchSlices(this.totalSliceNum);
        }
        return this.readMetrics;
    }

    @Override
    public void close() {
        this.pipelineFetcher.close();
    }

    protected void splitRemoteSlices(Map<Integer, List<ISliceMeta>> sliceMap) {
        int totalPipelineSlices = 0;
        int totalRemoteSlices = 0;
        HashMap pipelineSliceMap = null;
        HashMap remoteShuffleSliceMap = null;
        for (Map.Entry<Integer, List<ISliceMeta>> entry : sliceMap.entrySet()) {
            ArrayList remoteSlices = new ArrayList();
            ArrayList<ISliceMeta> pipelineSlices = new ArrayList<ISliceMeta>();
            for (ISliceMeta sliceMeta : entry.getValue()) {
                if (sliceMeta.getRecordNum() > 0L) {
                    pipelineSlices.add(sliceMeta);
                    continue;
                }
                this.addToBarrierQueue(sliceMeta);
            }
            if (!pipelineSlices.isEmpty()) {
                totalPipelineSlices += pipelineSlices.size();
                if (pipelineSliceMap == null) {
                    pipelineSliceMap = new HashMap();
                }
                pipelineSliceMap.put(entry.getKey(), pipelineSlices);
            }
            if (remoteSlices.isEmpty()) continue;
            totalRemoteSlices += remoteSlices.size();
            if (remoteShuffleSliceMap == null) {
                remoteShuffleSliceMap = new HashMap();
            }
            remoteShuffleSliceMap.put(entry.getKey(), remoteSlices);
        }
        this.totalSliceNum = totalPipelineSlices + totalRemoteSlices;
        if (pipelineSliceMap != null) {
            this.sliceMapList.add(pipelineSliceMap);
            this.fetcherList.add(this.pipelineFetcher);
        }
        LOG.info("{} shards:{} pipelineSlice:{} remoteSlice:{}", new Object[]{this.taskName, sliceMap.size(), totalPipelineSlices, totalRemoteSlices});
    }

    protected void addToBarrierQueue(ISliceMeta sliceMeta) {
        BaseSliceMeta baseSliceMeta = (BaseSliceMeta)sliceMeta;
        PipelineBarrier barrier = new PipelineBarrier(baseSliceMeta.getBatchId(), baseSliceMeta.getEdgeId(), sliceMeta.getSourceIndex(), sliceMeta.getTargetIndex(), sliceMeta.getRecordNum());
        this.barrierQueue.add(barrier);
    }
}

