/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.rest.service.task;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.logging.SetLogCategory;
import org.apache.kylin.common.util.ExecutorServiceUtil;
import org.apache.kylin.common.util.NamedThreadFactory;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.optimization.FrequencyMap;
import org.apache.kylin.metadata.epoch.EpochManager;
import org.apache.kylin.metadata.favorite.AbstractAsyncTask;
import org.apache.kylin.metadata.favorite.AccelerateRuleUtil;
import org.apache.kylin.metadata.favorite.AsyncAccelerationTask;
import org.apache.kylin.metadata.favorite.AsyncTaskManager;
import org.apache.kylin.metadata.favorite.QueryHistoryIdOffset;
import org.apache.kylin.metadata.favorite.QueryHistoryIdOffsetManager;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.query.NativeQueryRealization;
import org.apache.kylin.metadata.query.QueryHistory;
import org.apache.kylin.metadata.query.QueryHistoryInfo;
import org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO;
import org.apache.kylin.rest.service.IUserGroupService;
import org.apache.kylin.rest.service.QuerySmartSupporter;
import org.apache.kylin.rest.util.SpringContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryHistoryTaskScheduler {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(QueryHistoryTaskScheduler.class);
    private ScheduledExecutorService taskScheduler;
    private boolean hasStarted;
    @VisibleForTesting
    RDBMSQueryHistoryDAO queryHistoryDAO;
    AccelerateRuleUtil accelerateRuleUtil;
    private final String project;
    private QuerySmartSupporter querySmartSupporter;
    private long epochId;
    private IUserGroupService userGroupService;
    private final QueryHistoryAccelerateRunner queryHistoryAccelerateRunner;
    private final QueryHistoryMetaUpdateRunner queryHistoryMetaUpdateRunner;
    private static final Map<String, QueryHistoryTaskScheduler> INSTANCE_MAP = Maps.newConcurrentMap();

    public QueryHistoryTaskScheduler(String project) {
        this.project = project;
        this.queryHistoryDAO = RDBMSQueryHistoryDAO.getInstance();
        this.accelerateRuleUtil = new AccelerateRuleUtil();
        if (this.userGroupService == null && SpringContext.getApplicationContext() != null) {
            this.userGroupService = (IUserGroupService)SpringContext.getApplicationContext().getBean("userGroupService");
        }
        this.queryHistoryAccelerateRunner = new QueryHistoryAccelerateRunner(false);
        this.queryHistoryMetaUpdateRunner = new QueryHistoryMetaUpdateRunner();
        if (this.querySmartSupporter == null && SpringContext.getApplicationContext() != null) {
            this.querySmartSupporter = (QuerySmartSupporter)SpringContext.getBean(QuerySmartSupporter.class);
        }
        try (SetLogCategory ignored = new SetLogCategory("schedule");){
            log.debug("New QueryHistoryAccelerateScheduler created by project {}", (Object)project);
        }
    }

    public static QueryHistoryTaskScheduler getInstance(String project) {
        return INSTANCE_MAP.computeIfAbsent(project, QueryHistoryTaskScheduler::new);
    }

    public void init() {
        ProjectInstance projectInstance = NProjectManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv()).getProject(this.project);
        EpochManager epochManager = EpochManager.getInstance();
        if (!KylinConfig.getInstanceFromEnv().isUTEnv()) {
            this.epochId = epochManager.getEpoch(projectInstance.getName()).getEpochId();
        }
        this.taskScheduler = Executors.newScheduledThreadPool(1, (ThreadFactory)new NamedThreadFactory("QueryHistoryWorker(project:" + this.project + ")"));
        this.taskScheduler.scheduleWithFixedDelay(this.queryHistoryAccelerateRunner, 0L, KylinConfig.getInstanceFromEnv().getQueryHistoryAccelerateInterval(), TimeUnit.MINUTES);
        this.taskScheduler.scheduleWithFixedDelay(this.queryHistoryMetaUpdateRunner, 0L, KylinConfig.getInstanceFromEnv().getQueryHistoryStatMetaUpdateInterval(), TimeUnit.MINUTES);
        this.hasStarted = true;
        AsyncTaskManager.resetAccelerationTagMap((String)this.project);
        log.info("Query history task scheduler is started for [{}] ", (Object)this.project);
    }

    public Future scheduleImmediately(QueryHistoryTask runner) {
        return this.taskScheduler.schedule(runner, 10L, TimeUnit.SECONDS);
    }

    public boolean hasStarted() {
        return this.hasStarted;
    }

    private void shutdown() {
        log.info("Shutting down QueryHistoryAccelerateScheduler for [{}] ....", (Object)this.project);
        if (this.taskScheduler != null) {
            ExecutorServiceUtil.forceShutdown((ExecutorService)this.taskScheduler);
        }
    }

    public static synchronized void shutdownByProject(String project) {
        QueryHistoryTaskScheduler instance = QueryHistoryTaskScheduler.getInstanceByProject(project);
        if (instance != null) {
            INSTANCE_MAP.remove(project);
            instance.shutdown();
        }
    }

    public boolean isInterruptByUser() {
        AsyncTaskManager instance = AsyncTaskManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)this.getProject());
        AbstractAsyncTask task = instance.get("async_acceleration_task");
        return ((AsyncAccelerationTask)task).isAlreadyRunning();
    }

    private static synchronized QueryHistoryTaskScheduler getInstanceByProject(String project) {
        return INSTANCE_MAP.get(project);
    }

    @Generated
    public String getProject() {
        return this.project;
    }

    private static class DataflowHitCount {
        Map<Long, FrequencyMap> layoutHits = Maps.newHashMap();
        int dataflowHit;

        @Generated
        public DataflowHitCount() {
        }

        @Generated
        public Map<Long, FrequencyMap> getLayoutHits() {
            return this.layoutHits;
        }

        @Generated
        public int getDataflowHit() {
            return this.dataflowHit;
        }

        @Generated
        public void setLayoutHits(Map<Long, FrequencyMap> layoutHits) {
            this.layoutHits = layoutHits;
        }

        @Generated
        public void setDataflowHit(int dataflowHit) {
            this.dataflowHit = dataflowHit;
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof DataflowHitCount)) {
                return false;
            }
            DataflowHitCount other = (DataflowHitCount)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Map<Long, FrequencyMap> this$layoutHits = this.getLayoutHits();
            Map<Long, FrequencyMap> other$layoutHits = other.getLayoutHits();
            if (this$layoutHits == null ? other$layoutHits != null : !((Object)this$layoutHits).equals(other$layoutHits)) {
                return false;
            }
            return this.getDataflowHit() == other.getDataflowHit();
        }

        @Generated
        protected boolean canEqual(Object other) {
            return other instanceof DataflowHitCount;
        }

        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Map<Long, FrequencyMap> $layoutHits = this.getLayoutHits();
            result = result * 59 + ($layoutHits == null ? 43 : ((Object)$layoutHits).hashCode());
            result = result * 59 + this.getDataflowHit();
            return result;
        }

        @Generated
        public String toString() {
            return "QueryHistoryTaskScheduler.DataflowHitCount(layoutHits=" + this.getLayoutHits() + ", dataflowHit=" + this.getDataflowHit() + ")";
        }
    }

    private abstract class QueryHistoryTask
    implements Runnable {
        private volatile boolean needResetOffset = true;

        private QueryHistoryTask() {
        }

        protected abstract String name();

        protected void resetIdOffset(List<QueryHistory> queryHistories) {
            if (this.needResetOffset && CollectionUtils.isEmpty(queryHistories)) {
                long maxId = QueryHistoryTaskScheduler.this.queryHistoryDAO.getQueryHistoryMaxId(QueryHistoryTaskScheduler.this.project);
                this.resetIdOffset(maxId);
            }
        }

        private void resetIdOffset(long maxId) {
            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                KylinConfig config = KylinConfig.getInstanceFromEnv();
                QueryHistoryIdOffsetManager manager = QueryHistoryIdOffsetManager.getInstance((KylinConfig)config, (String)QueryHistoryTaskScheduler.this.project);
                QueryHistoryIdOffset queryHistoryIdOffset = manager.get();
                if (queryHistoryIdOffset.getOffset() > maxId || queryHistoryIdOffset.getStatMetaUpdateOffset() > maxId) {
                    queryHistoryIdOffset.setOffset(maxId);
                    queryHistoryIdOffset.setStatMetaUpdateOffset(maxId);
                    manager.save(queryHistoryIdOffset);
                }
                this.needResetOffset = false;
                return 0;
            }, (String)QueryHistoryTaskScheduler.this.project);
        }

        public void batchHandle(int batchSize, int maxSize, Consumer<List<QueryHistory>> consumer) {
            List<QueryHistory> queryHistories;
            if (batchSize <= 0 || maxSize < batchSize) {
                throw new IllegalArgumentException(String.format(Locale.ROOT, "%s task, batch size: %d , maxsize: %d is illegal", this.name(), batchSize, maxSize));
            }
            if (!KylinConfig.getInstanceFromEnv().isUTEnv() && !EpochManager.getInstance().checkEpochId(QueryHistoryTaskScheduler.this.epochId, QueryHistoryTaskScheduler.this.project)) {
                QueryHistoryTaskScheduler.shutdownByProject(QueryHistoryTaskScheduler.this.project);
                return;
            }
            int finishNum = 0;
            do {
                queryHistories = this.getQueryHistories(batchSize);
                finishNum += queryHistories.size();
                if (this.isInterrupted()) break;
                if (!queryHistories.isEmpty()) {
                    consumer.accept(queryHistories);
                }
                log.debug("{} handled {} query history", (Object)this.name(), (Object)queryHistories.size());
            } while (queryHistories.size() >= batchSize && finishNum < maxSize);
        }

        protected boolean isInterrupted() {
            return false;
        }

        protected abstract List<QueryHistory> getQueryHistories(int var1);

        @Override
        public void run() {
            try (SetLogCategory ignored = new SetLogCategory("schedule");){
                this.work();
            }
            catch (Exception e) {
                log.warn("QueryHistory {}  process failed of project({})", new Object[]{this.name(), QueryHistoryTaskScheduler.this.project, e});
            }
        }

        protected abstract void work();
    }

    public class QueryHistoryAccelerateRunner
    extends QueryHistoryTask {
        private final boolean isManual;

        public QueryHistoryAccelerateRunner(boolean isManual) {
            this.isManual = isManual;
        }

        @Override
        protected String name() {
            return "queryAcc";
        }

        @Override
        protected boolean isInterrupted() {
            return !this.isManual() && QueryHistoryTaskScheduler.getInstance(QueryHistoryTaskScheduler.this.project).isInterruptByUser();
        }

        @Override
        protected List<QueryHistory> getQueryHistories(int batchSize) {
            QueryHistoryIdOffsetManager qhIdOffsetManager = QueryHistoryIdOffsetManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)QueryHistoryTaskScheduler.this.project);
            List queryHistoryList = QueryHistoryTaskScheduler.this.queryHistoryDAO.queryQueryHistoriesByIdOffset(qhIdOffsetManager.get().getOffset(), batchSize, QueryHistoryTaskScheduler.this.project);
            this.resetIdOffset(queryHistoryList);
            return queryHistoryList;
        }

        @Override
        public void work() {
            if (NProjectManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv()).getProject(QueryHistoryTaskScheduler.this.project).isExpertMode()) {
                log.info("Skip QueryHistoryAccelerateRunner job, project [{}].", (Object)QueryHistoryTaskScheduler.this.project);
                return;
            }
            log.info("Start QueryHistoryAccelerateRunner job, project [{}].", (Object)QueryHistoryTaskScheduler.this.project);
            int batchSize = KylinConfig.getInstanceFromEnv().getQueryHistoryAccelerateBatchSize();
            int maxSize = this.isManual() ? KylinConfig.getInstanceFromEnv().getQueryHistoryAccelerateBatchSize() : KylinConfig.getInstanceFromEnv().getQueryHistoryAccelerateMaxSize();
            this.batchHandle(batchSize, maxSize, this::accelerateAndUpdateMetadata);
            log.info("End QueryHistoryAccelerateRunner job, project [{}].", (Object)QueryHistoryTaskScheduler.this.project);
        }

        private void accelerateAndUpdateMetadata(List<QueryHistory> queryHistories) {
            if (CollectionUtils.isEmpty(queryHistories)) {
                return;
            }
            ArrayList idToQHInfoList = Lists.newArrayList();
            Map<String, Set<String>> submitterToGroups = this.getUserToGroups(queryHistories);
            List matchedCandidate = QueryHistoryTaskScheduler.this.accelerateRuleUtil.findMatchedCandidate(QueryHistoryTaskScheduler.this.project, queryHistories, submitterToGroups, (List)idToQHInfoList);
            QueryHistoryTaskScheduler.this.queryHistoryDAO.batchUpdateQueryHistoriesInfo((List)idToQHInfoList);
            if (QueryHistoryTaskScheduler.this.querySmartSupporter != null) {
                QueryHistoryTaskScheduler.this.querySmartSupporter.onMatchQueryHistory(QueryHistoryTaskScheduler.this.project, matchedCandidate, this.isManual());
            }
            long maxId = 0L;
            for (QueryHistory queryHistory : queryHistories) {
                if (queryHistory.getId() <= maxId) continue;
                maxId = queryHistory.getId();
            }
            this.updateIdOffset(maxId);
        }

        protected Map<String, Set<String>> getUserToGroups(List<QueryHistory> queryHistories) {
            HashMap<String, Set<String>> submitterToGroups = new HashMap<String, Set<String>>();
            for (QueryHistory qh : queryHistories) {
                QueryHistoryInfo queryHistoryInfo = qh.getQueryHistoryInfo();
                if (queryHistoryInfo == null) continue;
                String querySubmitter = qh.getQuerySubmitter();
                submitterToGroups.putIfAbsent(querySubmitter, QueryHistoryTaskScheduler.this.userGroupService.listUserGroups(querySubmitter));
            }
            return submitterToGroups;
        }

        private void updateIdOffset(long maxId) {
            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                KylinConfig config = KylinConfig.getInstanceFromEnv();
                QueryHistoryIdOffset queryHistoryIdOffset = QueryHistoryIdOffsetManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)QueryHistoryTaskScheduler.this.project).get();
                queryHistoryIdOffset.setOffset(maxId);
                QueryHistoryIdOffsetManager.getInstance((KylinConfig)config, (String)QueryHistoryTaskScheduler.this.project).save(queryHistoryIdOffset);
                return 0;
            }, (String)QueryHistoryTaskScheduler.this.project);
        }

        @Generated
        public boolean isManual() {
            return this.isManual;
        }
    }

    public class QueryHistoryMetaUpdateRunner
    extends QueryHistoryTask {
        @Override
        protected String name() {
            return "metaUpdate";
        }

        @Override
        protected List<QueryHistory> getQueryHistories(int batchSize) {
            QueryHistoryIdOffsetManager qhIdOffsetManager = QueryHistoryIdOffsetManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)QueryHistoryTaskScheduler.this.project);
            List queryHistoryList = QueryHistoryTaskScheduler.this.queryHistoryDAO.queryQueryHistoriesByIdOffset(qhIdOffsetManager.get().getStatMetaUpdateOffset(), batchSize, QueryHistoryTaskScheduler.this.project);
            this.resetIdOffset(queryHistoryList);
            return queryHistoryList;
        }

        @Override
        public void work() {
            int maxSize = KylinConfig.getInstanceFromEnv().getQueryHistoryStatMetaUpdateMaxSize();
            int batchSize = KylinConfig.getInstanceFromEnv().getQueryHistoryStatMetaUpdateBatchSize();
            this.batchHandle(batchSize, maxSize, this::updateStatMeta);
        }

        private void updateStatMeta(List<QueryHistory> queryHistories) {
            long maxId = 0L;
            HashMap modelsLastQueryTime = Maps.newHashMap();
            Map<String, DataflowHitCount> dfHitCountMap = this.collectDataflowHitCount(queryHistories);
            for (QueryHistory queryHistory : queryHistories) {
                this.collectModelLastQueryTime(queryHistory, modelsLastQueryTime);
                if (queryHistory.getId() <= maxId) continue;
                maxId = queryHistory.getId();
            }
            Map<TableExtDesc, Integer> hitSnapshotCountMap = this.collectSnapshotHitCount(queryHistories);
            this.updateMetadata(dfHitCountMap, modelsLastQueryTime, maxId, hitSnapshotCountMap);
        }

        private void updateMetadata(Map<String, DataflowHitCount> dfHitCountMap, Map<String, Long> modelsLastQueryTime, Long maxId, Map<TableExtDesc, Integer> hitSnapshotCountMap) {
            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                KylinConfig config = KylinConfig.getInstanceFromEnv();
                this.incQueryHitCount(dfHitCountMap, QueryHistoryTaskScheduler.this.project);
                this.updateLastQueryTime(modelsLastQueryTime, QueryHistoryTaskScheduler.this.project);
                QueryHistoryIdOffset queryHistoryIdOffset = QueryHistoryIdOffsetManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)QueryHistoryTaskScheduler.this.project).get();
                queryHistoryIdOffset.setStatMetaUpdateOffset(maxId.longValue());
                QueryHistoryIdOffsetManager.getInstance((KylinConfig)config, (String)QueryHistoryTaskScheduler.this.project).save(queryHistoryIdOffset);
                this.incQueryHitSnapshotCount(hitSnapshotCountMap, QueryHistoryTaskScheduler.this.project);
                return 0;
            }, (String)QueryHistoryTaskScheduler.this.project);
        }

        private Map<String, DataflowHitCount> collectDataflowHitCount(List<QueryHistory> queryHistories) {
            HashMap result = Maps.newHashMap();
            for (QueryHistory queryHistory : queryHistories) {
                List realizations = queryHistory.transformRealizations(QueryHistoryTaskScheduler.this.project);
                if (CollectionUtils.isEmpty((Collection)realizations)) continue;
                List realizationList = realizations.stream().filter(this::isValidRealization).collect(Collectors.toList());
                for (NativeQueryRealization realization : realizationList) {
                    String modelId = realization.getModelId();
                    result.computeIfAbsent(modelId, k -> new DataflowHitCount());
                    ++((DataflowHitCount)result.get((Object)modelId)).dataflowHit;
                    Map<Long, FrequencyMap> layoutHits = ((DataflowHitCount)result.get(modelId)).getLayoutHits();
                    layoutHits.computeIfAbsent(realization.getLayoutId(), k -> new FrequencyMap());
                    layoutHits.get(realization.getLayoutId()).incFrequency(queryHistory.getQueryTime());
                }
            }
            return result;
        }

        private boolean isValidRealization(NativeQueryRealization realization) {
            KylinConfig config = KylinConfig.getInstanceFromEnv();
            NDataflowManager dfManager = NDataflowManager.getInstance((KylinConfig)config, (String)QueryHistoryTaskScheduler.this.project);
            return dfManager.getDataflow(realization.getModelId()) != null && realization.getLayoutId() != null;
        }

        private Map<TableExtDesc, Integer> collectSnapshotHitCount(List<QueryHistory> queryHistories) {
            NTableMetadataManager tableManager = NTableMetadataManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)QueryHistoryTaskScheduler.this.project);
            HashMap results = Maps.newHashMap();
            for (QueryHistory queryHistory : queryHistories) {
                if (queryHistory.getQueryHistoryInfo() == null) continue;
                List snapshotsInRealization = queryHistory.getQueryHistoryInfo().getQuerySnapshots();
                for (List snapshots : snapshotsInRealization) {
                    snapshots.forEach(tableIdentify -> results.merge(tableManager.getOrCreateTableExt(tableIdentify), 1, Integer::sum));
                }
            }
            return results;
        }

        private void collectModelLastQueryTime(QueryHistory queryHistory, Map<String, Long> modelsLastQueryTime) {
            List realizations = queryHistory.transformRealizations(QueryHistoryTaskScheduler.this.project);
            long queryTime = queryHistory.getQueryTime();
            for (NativeQueryRealization realization : realizations) {
                String modelId = realization.getModelId();
                if (StringUtils.isEmpty((CharSequence)modelId)) continue;
                modelsLastQueryTime.put(modelId, queryTime);
            }
        }

        private void incQueryHitCount(Map<String, DataflowHitCount> dfHitCountMap, String project) {
            NDataflowManager dfManager = NDataflowManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)project);
            for (Map.Entry<String, DataflowHitCount> entry : dfHitCountMap.entrySet()) {
                if (dfManager.getDataflow(entry.getKey()) == null) continue;
                Map<Long, FrequencyMap> layoutHitCount = entry.getValue().getLayoutHits();
                dfManager.updateDataflow(entry.getKey(), copyForWrite -> {
                    copyForWrite.setQueryHitCount(copyForWrite.getQueryHitCount() + ((DataflowHitCount)entry.getValue()).getDataflowHit());
                    for (Map.Entry layoutHitEntry : layoutHitCount.entrySet()) {
                        copyForWrite.getLayoutHitCount().merge(layoutHitEntry.getKey(), layoutHitEntry.getValue(), FrequencyMap::merge);
                    }
                });
            }
        }

        private void incQueryHitSnapshotCount(Map<TableExtDesc, Integer> hitSnapshotCountMap, String project) {
            NTableMetadataManager tableManager = NTableMetadataManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)project);
            for (Map.Entry<TableExtDesc, Integer> entry : hitSnapshotCountMap.entrySet()) {
                if (tableManager.getOrCreateTableExt(entry.getKey().getIdentity()) == null) continue;
                TableExtDesc tableCopy = tableManager.copyForWrite(entry.getKey());
                tableCopy.setSnapshotHitCount(tableCopy.getSnapshotHitCount() + entry.getValue());
                tableManager.saveTableExt(tableCopy);
            }
        }

        private void updateLastQueryTime(Map<String, Long> modelsLastQueryTime, String project) {
            NDataflowManager dfManager = NDataflowManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)project);
            for (Map.Entry<String, Long> entry : modelsLastQueryTime.entrySet()) {
                String dataflowId = entry.getKey();
                Long lastQueryTime = entry.getValue();
                if (dfManager.getDataflow(dataflowId) == null) continue;
                dfManager.updateDataflow(dataflowId, copyForWrite -> copyForWrite.setLastQueryTime(lastQueryTime.longValue()));
            }
        }
    }
}

