/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.upgrade;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.DirectWriteMarkers;
import org.apache.hudi.table.upgrade.DowngradeHandler;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;

public class TwoToOneDowngradeHandler
implements DowngradeHandler {
    @Override
    public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
        HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
        HoodieTableMetaClient metaClient = table.getMetaClient();
        HoodieTimeline inflightTimeline = metaClient.getCommitsTimeline().filterPendingExcludingCompactionAndLogCompaction();
        List commits = inflightTimeline.getReverseOrderedInstants().collect(Collectors.toList());
        for (HoodieInstant inflightInstant : commits) {
            try {
                this.convertToDirectMarkers(inflightInstant.requestedTime(), table, context, config.getMarkersDeleteParallelism());
            }
            catch (IOException e) {
                throw new HoodieException("Converting marker files to DIRECT style failed during downgrade", e);
            }
        }
        return Collections.emptyMap();
    }

    private void convertToDirectMarkers(String commitInstantTime, HoodieTable table, HoodieEngineContext context, int parallelism) throws IOException {
        block5: {
            HoodieStorage storage2;
            String markerDir;
            block4: {
                markerDir = table.getMetaClient().getMarkerFolderPath(commitInstantTime);
                storage2 = HoodieStorageUtils.getStorage(markerDir, context.getStorageConf().newInstance());
                Option<MarkerType> markerTypeOption = MarkerUtils.readMarkerType(storage2, markerDir);
                if (!markerTypeOption.isPresent()) break block4;
                switch (markerTypeOption.get()) {
                    case TIMELINE_SERVER_BASED: {
                        Map<String, Set<String>> markersMap = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(markerDir, storage2, context, parallelism);
                        DirectWriteMarkers directWriteMarkers = new DirectWriteMarkers(table, commitInstantTime);
                        markersMap.values().stream().flatMap(Collection::stream).forEach(directWriteMarkers::create);
                        MarkerUtils.deleteMarkerTypeFile(storage2, markerDir);
                        this.deleteTimelineBasedMarkerFiles(context, markerDir, storage2, parallelism);
                        break block5;
                    }
                    default: {
                        throw new HoodieException("The marker type \"" + markerTypeOption.get().name() + "\" is not supported for rollback.");
                    }
                }
            }
            if (storage2.exists(new StoragePath(markerDir))) {
                this.deleteTimelineBasedMarkerFiles(context, markerDir, storage2, parallelism);
            }
        }
    }

    private void deleteTimelineBasedMarkerFiles(HoodieEngineContext context, String markerDir, HoodieStorage storage2, int parallelism) throws IOException {
        Predicate<StoragePathInfo> prefixFilter = fileStatus -> fileStatus.getPath().getName().startsWith("MARKERS");
        FSUtils.parallelizeSubPathProcess(context, storage2, new StoragePath(markerDir), parallelism, prefixFilter, pairOfSubPathAndConf -> FSUtils.deleteSubPath((String)pairOfSubPathAndConf.getKey(), (StorageConfiguration)pairOfSubPathAndConf.getValue(), false));
    }
}

