/*
 * Decompiled with CFR 0.152.
 */
package io.delta.flink.source.internal.enumerator.monitor;

import io.delta.flink.source.internal.enumerator.monitor.ChangesPerVersion;
import io.delta.flink.source.internal.enumerator.monitor.TableMonitorResult;
import io.delta.flink.source.internal.enumerator.processor.ActionProcessor;
import io.delta.standalone.DeltaLog;
import io.delta.standalone.VersionLog;
import io.delta.standalone.actions.Action;
import io.delta.standalone.actions.AddFile;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;

public class TableMonitor
implements Callable<TableMonitorResult> {
    private final DeltaLog deltaLog;
    private final ActionProcessor actionProcessor;
    private final long maxDurationMillis;
    private long monitorVersion;

    public TableMonitor(DeltaLog deltaLog, long l, long l2, ActionProcessor actionProcessor) {
        this.deltaLog = deltaLog;
        this.monitorVersion = l;
        this.maxDurationMillis = l2;
        this.actionProcessor = actionProcessor;
    }

    @Override
    public TableMonitorResult call() throws Exception {
        TableMonitorResult tableMonitorResult = this.monitorForChanges(this.monitorVersion);
        List<ChangesPerVersion<AddFile>> list = tableMonitorResult.getChanges();
        if (!list.isEmpty()) {
            this.monitorVersion = list.get(list.size() - 1).getSnapshotVersion() + 1L;
        }
        return tableMonitorResult;
    }

    public long getMonitorVersion() {
        return this.monitorVersion;
    }

    private TableMonitorResult monitorForChanges(long l) {
        Iterator iterator = this.deltaLog.getChanges(l, true);
        if (iterator.hasNext()) {
            return this.processChanges(iterator);
        }
        return new TableMonitorResult(Collections.emptyList());
    }

    private TableMonitorResult processChanges(Iterator<VersionLog> iterator) {
        ArrayList<ChangesPerVersion<AddFile>> arrayList = new ArrayList<ChangesPerVersion<AddFile>>();
        long l = System.currentTimeMillis() + this.maxDurationMillis;
        String string = this.deltaLog.getPath().toUri().normalize().toString();
        while (iterator.hasNext()) {
            VersionLog versionLog = iterator.next();
            ChangesPerVersion<Action> changesPerVersion = new ChangesPerVersion<Action>(string, versionLog.getVersion(), versionLog.getActions());
            ChangesPerVersion<AddFile> changesPerVersion2 = this.actionProcessor.processActions(changesPerVersion);
            arrayList.add(changesPerVersion2);
            if (System.currentTimeMillis() < l) continue;
            break;
        }
        return new TableMonitorResult(arrayList);
    }
}

