/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.api.function.internal;

import com.antgroup.geaflow.api.context.RuntimeContext;
import com.antgroup.geaflow.api.function.RichFunction;
import com.antgroup.geaflow.api.function.io.SourceFunction;
import com.antgroup.geaflow.api.window.IWindow;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CollectionSource<OUT>
extends RichFunction
implements SourceFunction<OUT> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CollectionSource.class);
    private int batchSize;
    private List<OUT> records;
    private static Map<Integer, Integer> readPosMap = new ConcurrentHashMap<Integer, Integer>();
    private transient RuntimeContext runtimeContext;

    public CollectionSource(Collection<OUT> records, int batchSize) {
        this.records = new ArrayList<OUT>(records);
        this.batchSize = batchSize;
    }

    public CollectionSource(Collection<OUT> records) {
        this(records, 2);
    }

    public CollectionSource(OUT ... collections) {
        this((Collection<OUT>)Arrays.asList(collections));
    }

    @Override
    public void open(RuntimeContext runtimeContext) {
        this.runtimeContext = runtimeContext;
    }

    @Override
    public void init(int parallel, int index) {
        if (parallel != 1) {
            List<OUT> allRecords = this.records;
            this.records = new ArrayList<OUT>();
            for (int i = 0; i < allRecords.size(); ++i) {
                if (i % parallel != index) continue;
                this.records.add(allRecords.get(i));
            }
        }
    }

    @Override
    public boolean fetch(IWindow<OUT> window, SourceFunction.SourceContext<OUT> ctx) throws Exception {
        OUT out;
        int readPos;
        String taskName = this.runtimeContext.getTaskArgs().getTaskName();
        int taskId = this.runtimeContext.getTaskArgs().getTaskId();
        LOGGER.info("taskName:{} fetch batchId:{} readPos:{}", new Object[]{taskName, window.windowId(), readPos});
        for (readPos = readPosMap.getOrDefault(taskId, 0).intValue(); readPos < this.records.size() && window.assignWindow(out = this.records.get(readPos)) == window.windowId() && ctx.collect(out); ++readPos) {
        }
        LOGGER.info("taskName:{} save batchId:{} readPos:{}", new Object[]{taskName, window.windowId(), readPos});
        readPosMap.put(taskId, readPos);
        return readPos < this.records.size();
    }

    @Override
    public void close() {
    }
}

