/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.cluster.fetcher;

import com.antgroup.geaflow.cluster.fetcher.BarrierHandler;
import com.antgroup.geaflow.cluster.fetcher.IInputMessageBuffer;
import com.antgroup.geaflow.cluster.fetcher.InitFetchRequest;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.encoder.IEncoder;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.metric.EventMetrics;
import com.antgroup.geaflow.common.metric.ShuffleReadMetrics;
import com.antgroup.geaflow.io.AbstractMessageBuffer;
import com.antgroup.geaflow.shuffle.api.reader.IShuffleReader;
import com.antgroup.geaflow.shuffle.message.FetchRequest;
import com.antgroup.geaflow.shuffle.message.PipelineBarrier;
import com.antgroup.geaflow.shuffle.message.PipelineMessage;
import com.antgroup.geaflow.shuffle.service.ShuffleManager;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipelineInputFetcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipelineInputFetcher.class);
    private Configuration config;
    private IShuffleReader shuffleReader;
    private InitFetchRequest initRequest;
    private List<IInputMessageBuffer<?>> fetchListeners;
    private BarrierHandler barrierHandler;
    private long pipelineId;
    private String pipelineName;

    public PipelineInputFetcher(Configuration config) {
        this.config = config;
    }

    public void init(InitFetchRequest request) {
        if (this.shuffleReader != null) {
            this.shuffleReader.close();
            this.shuffleReader = null;
        }
        this.shuffleReader = ShuffleManager.getInstance().loadShuffleReader(this.config);
        this.pipelineId = request.getPipelineId();
        this.pipelineName = request.getPipelineName();
        this.fetchListeners = request.getFetchListeners();
        this.barrierHandler = new BarrierHandler(request.getTaskId(), request.getTotalSliceNum());
        this.initRequest = request;
        for (IEncoder<?> encoder : this.initRequest.getEncoders().values()) {
            if (encoder == null) continue;
            encoder.init(this.config);
        }
    }

    public void fetch(long startWindowId, long windowCount) {
        LOGGER.info("task {} start fetch windowId:{} count:{}", new Object[]{this.initRequest.getTaskId(), startWindowId, windowCount});
        long targetWindowId = startWindowId + windowCount - 1L;
        this.fetch(this.buildFetchRequest(targetWindowId));
    }

    public void cancel() {
    }

    public void close() {
        try {
            if (this.shuffleReader != null) {
                this.shuffleReader.close();
            }
        }
        catch (Throwable t) {
            LOGGER.error(t.getMessage(), t);
            throw new GeaflowRuntimeException(t);
        }
    }

    protected FetchRequest buildFetchRequest(long targetBatchId) {
        FetchRequest request = new FetchRequest();
        request.setPipelineId(this.pipelineId);
        request.setPipelineName(this.pipelineName);
        request.setTaskId(this.initRequest.getTaskId());
        request.setTaskIndex(this.initRequest.getTaskIndex());
        request.setTaskName(this.initRequest.getTaskName());
        request.setVertexId(this.initRequest.getVertexId());
        request.setTargetBatchId(targetBatchId);
        request.setInputStreamMap(this.initRequest.getInputStreamMap());
        request.setDescriptor(this.initRequest.getDescriptor());
        request.setInputSlices(this.initRequest.getInputSlices());
        request.setEncoders(this.initRequest.getEncoders());
        return request;
    }

    /*
     * Unable to fully structure code
     */
    protected void fetch(FetchRequest request) {
        this.shuffleReader.fetch(request);
lbl2:
        // 3 sources

        try {
            while (this.shuffleReader.hasNext()) {
                block5: {
                    event = this.shuffleReader.next();
                    if (event == null) continue;
                    if (!(event instanceof PipelineMessage)) break block5;
                    message = (PipelineMessage)event;
                    for (IInputMessageBuffer<?> listener : this.fetchListeners) {
                        listener.onMessage(message);
                    }
                    ** GOTO lbl2
                }
                barrier = (PipelineBarrier)event;
                if (!this.barrierHandler.checkCompleted(barrier)) continue;
                windowId = barrier.getWindowId();
                windowCount = this.barrierHandler.getTotalWindowCount();
                this.handleMetrics();
                for (IInputMessageBuffer<?> listener : this.fetchListeners) {
                    listener.onBarrier(windowId, windowCount);
                }
                ** GOTO lbl2
            }
            PipelineInputFetcher.LOGGER.info("task {} worker reader finish fetch windowId {}", (Object)request.getTaskId(), (Object)request.getTargetBatchId());
        }
        catch (Throwable e) {
            PipelineInputFetcher.LOGGER.error("fetcher encounters unexpected exception: {}", (Object)e.getMessage(), (Object)e);
            throw new GeaflowRuntimeException(e);
        }
    }

    private void handleMetrics() {
        ShuffleReadMetrics shuffleReadMetrics = this.shuffleReader.getShuffleReadMetrics();
        for (IInputMessageBuffer<?> listener : this.fetchListeners) {
            EventMetrics eventMetrics = ((AbstractMessageBuffer)listener).getEventMetrics();
            eventMetrics.addShuffleReadBytes(shuffleReadMetrics.getDecodeBytes());
        }
    }
}

