/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.marker;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.AppendMarkerHandler;
import org.apache.hudi.table.marker.DirectWriteMarkers;

public class DirectWriteMarkersV1
extends DirectWriteMarkers
implements AppendMarkerHandler {
    private static final Predicate<String> APPEND_MARKER_PREDICATE = pathStr -> pathStr.contains(".marker") && pathStr.endsWith(IOType.APPEND.name());

    public DirectWriteMarkersV1(HoodieTable table, String instantTime) {
        super(table, instantTime);
    }

    @Override
    public Option<StoragePath> createLogMarkerIfNotExists(String partitionPath, String fileName, HoodieWriteConfig writeConfig, String fileId, HoodieActiveTimeline activeTimeline) {
        return this.createIfNotExists(partitionPath, fileName, IOType.APPEND, writeConfig, fileId, activeTimeline);
    }

    @Override
    public Set<String> getAppendedLogPaths(HoodieEngineContext context, int parallelism) throws IOException {
        Pair<List<String>, Set<String>> subDirectoriesAndDataFiles = this.getSubDirectoriesByMarkerCondition(this.storage.listDirectEntries(this.markerDirPath), APPEND_MARKER_PREDICATE);
        List subDirectories = (List)subDirectoriesAndDataFiles.getLeft();
        Set logFiles = (Set)subDirectoriesAndDataFiles.getRight();
        if (subDirectories.size() > 0) {
            parallelism = Math.min(subDirectories.size(), parallelism);
            StorageConfiguration storageConf = this.storage.getConf();
            context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker files for all created, merged paths");
            logFiles.addAll(context.flatMap(subDirectories, (SerializableFunction & Serializable)directory -> {
                LinkedList<StoragePath> candidatesDirs = new LinkedList<StoragePath>();
                candidatesDirs.add(new StoragePath(directory));
                ArrayList<String> result = new ArrayList<String>();
                while (!candidatesDirs.isEmpty()) {
                    StoragePath path = (StoragePath)candidatesDirs.remove();
                    HoodieStorage storage = HoodieStorageUtils.getStorage((StoragePath)path, (StorageConfiguration)storageConf);
                    List storagePathInfos = storage.listDirectEntries(path);
                    for (StoragePathInfo pathInfo : storagePathInfos) {
                        if (pathInfo.isDirectory()) {
                            candidatesDirs.add(pathInfo.getPath());
                            continue;
                        }
                        String pathStr = pathInfo.getPath().toString();
                        if (!APPEND_MARKER_PREDICATE.test(pathStr)) continue;
                        result.add(this.translateMarkerToDataPath(pathStr));
                    }
                }
                return result.stream();
            }, parallelism));
        }
        return logFiles;
    }
}

