/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.timeline.service.handlers.marker;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.timeline.service.handlers.MarkerHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarkerBasedEarlyConflictDetectionRunnable
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(MarkerBasedEarlyConflictDetectionRunnable.class);
    private MarkerHandler markerHandler;
    private String markerDir;
    private String basePath;
    private FileSystem fs;
    private AtomicBoolean hasConflict;
    private long maxAllowableHeartbeatIntervalInMs;
    private Set<HoodieInstant> completedCommits;
    private final boolean checkCommitConflict;

    public MarkerBasedEarlyConflictDetectionRunnable(AtomicBoolean hasConflict, MarkerHandler markerHandler, String markerDir, String basePath, FileSystem fileSystem2, long maxAllowableHeartbeatIntervalInMs, Set<HoodieInstant> completedCommits, boolean checkCommitConflict) {
        this.markerHandler = markerHandler;
        this.markerDir = markerDir;
        this.basePath = basePath;
        this.fs = fileSystem2;
        this.hasConflict = hasConflict;
        this.maxAllowableHeartbeatIntervalInMs = maxAllowableHeartbeatIntervalInMs;
        this.completedCommits = completedCommits;
        this.checkCommitConflict = checkCommitConflict;
    }

    @Override
    public void run() {
        if (this.hasConflict.get()) {
            return;
        }
        try {
            Set<String> pendingMarkers = this.markerHandler.getPendingMarkersToProcess(this.markerDir);
            if (!this.fs.exists(new Path(this.markerDir)) && pendingMarkers.isEmpty()) {
                return;
            }
            HoodieTimer timer = HoodieTimer.start();
            HashSet<String> currentInstantAllMarkers = new HashSet<String>();
            currentInstantAllMarkers.addAll(this.markerHandler.getAllMarkers(this.markerDir));
            currentInstantAllMarkers.addAll(pendingMarkers);
            Path tempPath = new Path(this.basePath + "/" + ".hoodie/.temp");
            List<Path> instants = MarkerUtils.getAllMarkerDir(tempPath, this.fs);
            HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(new Configuration()).setBasePath(this.basePath).setLoadActiveTimelineOnLoad(true).build();
            HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
            List<String> candidate = MarkerUtils.getCandidateInstants(activeTimeline, instants, MarkerUtils.markerDirToInstantTime(this.markerDir), this.maxAllowableHeartbeatIntervalInMs, this.fs, this.basePath);
            Set tableMarkers = candidate.stream().flatMap(instant -> MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(instant, this.fs, new HoodieLocalEngineContext(new Configuration()), 100).values().stream().flatMap(Collection::stream)).collect(Collectors.toSet());
            Set currentFileIDs = currentInstantAllMarkers.stream().map(MarkerUtils::makerToPartitionAndFileID).collect(Collectors.toSet());
            Set tableFilesIDs = tableMarkers.stream().map(MarkerUtils::makerToPartitionAndFileID).collect(Collectors.toSet());
            currentFileIDs.retainAll(tableFilesIDs);
            if (!currentFileIDs.isEmpty() || this.checkCommitConflict && MarkerUtils.hasCommitConflict(activeTimeline, currentInstantAllMarkers.stream().map(MarkerUtils::makerToPartitionAndFileID).collect(Collectors.toSet()), this.completedCommits)) {
                LOG.warn("Conflict writing detected based on markers!\nConflict markers: " + currentInstantAllMarkers + "\nTable markers: " + tableMarkers);
                this.hasConflict.compareAndSet(false, true);
            }
            LOG.info("Finish batching marker-based conflict detection in " + timer.endTimer() + " ms");
        }
        catch (IOException e) {
            throw new HoodieIOException("IOException occurs during checking marker conflict");
        }
    }
}

