/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.memory;

import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipelineShard;
import com.antgroup.geaflow.shuffle.api.pipeline.buffer.PipelineSlice;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.PipelineSliceListener;
import com.antgroup.geaflow.shuffle.api.pipeline.fetcher.PipelineSliceReader;
import com.antgroup.geaflow.shuffle.message.SliceId;
import com.antgroup.geaflow.shuffle.message.WriterId;
import com.antgroup.geaflow.shuffle.util.SliceNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShuffleDataManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(ShuffleDataManager.class);
    private static ShuffleDataManager INSTANCE;
    private final Map<WriterId, PipelineShard> entries = new ConcurrentHashMap<WriterId, PipelineShard>();
    private final Set<Long> pipelineSet = new HashSet<Long>();

    public static synchronized ShuffleDataManager getInstance() {
        if (INSTANCE == null) {
            INSTANCE = new ShuffleDataManager();
        }
        return INSTANCE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void register(WriterId writerId, PipelineShard pipeShard) {
        LOGGER.info("{} register {} {} slices", new Object[]{pipeShard.getTaskName(), writerId, pipeShard.getSliceNum()});
        PipelineShard previousShard = this.entries.put(writerId, pipeShard);
        if (previousShard != null && pipeShard.hasData()) {
            throw new IllegalStateException("already registered:" + writerId);
        }
        Set<Long> set = this.pipelineSet;
        synchronized (set) {
            this.pipelineSet.add(writerId.getPipelineId());
        }
    }

    public PipelineShard getShard(WriterId writerID) {
        return this.entries.get(writerID);
    }

    public PipelineSlice getSlice(SliceId sliceId) {
        PipelineShard shard = this.entries.get(sliceId.getWriterId());
        if (shard == null) {
            return null;
        }
        return shard.getSlice(sliceId.getSliceIndex());
    }

    public PipelineSliceReader createSliceReader(SliceId sliceId, long startBatchId, PipelineSliceListener listener) throws IOException {
        PipelineSlice slice = this.getSlice(sliceId);
        if (slice == null) {
            throw new SliceNotFoundException(sliceId);
        }
        return slice.createSliceReader(startBatchId, listener);
    }

    public int getBlockCount() {
        int cacheSize = 0;
        for (PipelineShard block : this.entries.values()) {
            cacheSize += block.getBufferCount();
        }
        return cacheSize;
    }

    public void release(SliceId sliceId) {
        WriterId writerID = sliceId.getWriterId();
        PipelineShard shard = this.entries.get(writerID);
        if (shard != null) {
            shard.release(sliceId.getSliceIndex());
            if (shard.disposedIfNeed()) {
                shard.release();
                this.entries.remove(writerID);
                LOGGER.info("remove {} {}", (Object)shard.getTaskName(), (Object)writerID);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void release(long pipelineId) {
        Set<Long> set = this.pipelineSet;
        synchronized (set) {
            if (this.pipelineSet.contains(pipelineId)) {
                int totalShards = 0;
                Iterator<Map.Entry<WriterId, PipelineShard>> shardIterator = this.entries.entrySet().iterator();
                while (shardIterator.hasNext()) {
                    Map.Entry<WriterId, PipelineShard> entry = shardIterator.next();
                    if (entry.getKey().getPipelineId() != pipelineId) continue;
                    entry.getValue().release();
                    shardIterator.remove();
                    ++totalShards;
                }
                LOGGER.info("cleanup {} shuffle shards: {}", (Object)pipelineId, (Object)totalShards);
            }
            this.pipelineSet.remove(pipelineId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void releaseAll() {
        int totalShards = 0;
        Map<WriterId, PipelineShard> map = this.entries;
        synchronized (map) {
            for (PipelineShard shard : this.entries.values()) {
                shard.release();
                ++totalShards;
            }
            this.entries.clear();
        }
        LOGGER.info("cleanup all slices of {} shards", (Object)totalShards);
    }
}

