/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.cluster.fetcher;

import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.shuffle.message.PipelineBarrier;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BarrierHandler
implements Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(BarrierHandler.class);
    private final int inputSliceNum;
    private final Map<Long, Set<PipelineBarrier>> barrierCache;
    private long finishedWindowId;
    private long totalWindowCount;
    private int taskId;

    public BarrierHandler(int taskId, int sliceNum) {
        this.inputSliceNum = sliceNum;
        this.barrierCache = new HashMap<Long, Set<PipelineBarrier>>();
        this.finishedWindowId = -1L;
        this.totalWindowCount = 0L;
        this.taskId = taskId;
    }

    public boolean checkCompleted(PipelineBarrier barrier) {
        if (barrier.getWindowId() <= this.finishedWindowId) {
            throw new GeaflowRuntimeException(String.format("illegal state: taskId %s window %s has finished, last finished window is: %s", this.taskId, barrier.getWindowId(), this.finishedWindowId));
        }
        long windowId = barrier.getWindowId();
        Set<PipelineBarrier> inputBarriers = this.barrierCache.computeIfAbsent(windowId, key -> new HashSet());
        inputBarriers.add(barrier);
        int barrierSize = inputBarriers.size();
        if (barrierSize == this.inputSliceNum) {
            inputBarriers = this.barrierCache.remove(windowId);
            this.finishedWindowId = windowId;
            this.totalWindowCount = inputBarriers.stream().mapToLong(PipelineBarrier::getCount).sum();
            inputBarriers.clear();
            return true;
        }
        return false;
    }

    public long getTotalWindowCount() {
        return this.totalWindowCount;
    }
}

