/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.tools.migrator.core.runner;

import com.oceanbase.tools.migrator.common.enums.TaskStatus;
import com.oceanbase.tools.migrator.core.data.TransRows;
import com.oceanbase.tools.migrator.core.handler.DataReadIterator;
import com.oceanbase.tools.migrator.core.meta.TaskMeta;
import com.oceanbase.tools.migrator.core.runner.CommonRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadDataFromTaskRunner
extends CommonRunner<TaskMeta, TransRows> {
    private static final Logger log = LoggerFactory.getLogger(ReadDataFromTaskRunner.class);

    @Override
    void run(TaskMeta taskMeta) throws Exception {
        DataReadIterator iter = DataReadIterator.buildDataReadIterator(taskMeta);
        while (!iter.isEnd()) {
            if (Thread.currentThread().isInterrupted()) {
                log.info("job isInterrupted, this thread will exit");
                break;
            }
            if (taskMeta.getJobMeta().isToStop()) {
                log.info("job is to stop");
                break;
            }
            TransRows transRows = iter.getNext();
            if (transRows == null || this.writeTunnel == null) continue;
            taskMeta.addPendingTransRows(transRows);
            this.writeTunnel.put(transRows);
        }
        if (iter.isEnd() && taskMeta.getTaskStatus() == TaskStatus.RUNNING) {
            taskMeta.setTaskStatus(TaskStatus.PRODUCER_FINISHED);
            log.info(String.format("all data read, taskMeta = %s", taskMeta.toString()));
        }
    }
}

