/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.cluster.fetcher;

import com.antgroup.geaflow.cluster.fetcher.IFetchRequest;
import com.antgroup.geaflow.cluster.fetcher.InitFetchRequest;
import com.antgroup.geaflow.cluster.fetcher.PipelineInputFetcher;
import com.antgroup.geaflow.cluster.fetcher.ReFetchRequest;
import com.antgroup.geaflow.cluster.task.runner.AbstractTaskRunner;
import com.antgroup.geaflow.common.config.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FetcherRunner
extends AbstractTaskRunner<IFetchRequest> {
    private static final Logger LOGGER = LoggerFactory.getLogger(FetcherRunner.class);
    private final PipelineInputFetcher fetcher;

    public FetcherRunner(Configuration configuration) {
        this.fetcher = new PipelineInputFetcher(configuration);
    }

    @Override
    protected void process(IFetchRequest task) {
        if (task instanceof InitFetchRequest) {
            this.fetcher.init((InitFetchRequest)task);
        } else {
            ReFetchRequest request = (ReFetchRequest)task;
            this.fetcher.fetch(request.getStartBatchId(), request.getWindowCount());
        }
    }

    @Override
    public void interrupt() {
        LOGGER.info("cancel fetcher runner");
        this.fetcher.cancel();
    }

    @Override
    public void shutdown() {
        super.shutdown();
        this.fetcher.close();
    }
}

