/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.meta;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.meta.CkpMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CkpMetadata
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class);
    protected static final int MAX_RETAIN_CKP_NUM = 3;
    private static final String CKP_META = "ckp_meta";
    private final FileSystem fs;
    protected final Path path;
    private List<CkpMessage> messages;
    private List<String> instantCache;

    private CkpMetadata(Configuration config) {
        this(FSUtils.getFs(config.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(config)), config.getString(FlinkOptions.PATH));
    }

    private CkpMetadata(FileSystem fs, String basePath) {
        this.fs = fs;
        this.path = new Path(CkpMetadata.ckpMetaPath(basePath));
    }

    public void close() {
        this.instantCache = null;
    }

    public void bootstrap(HoodieTableMetaClient metaClient) throws IOException {
        this.fs.delete(this.path, true);
        this.fs.mkdirs(this.path);
    }

    public void startInstant(String instant) {
        Path path = this.fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT));
        try {
            this.fs.createNewFile(path);
        }
        catch (IOException e) {
            throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant, e);
        }
        this.clean(instant);
    }

    private void clean(String newInstant) {
        if (this.instantCache == null) {
            this.instantCache = new ArrayList<String>();
        }
        this.instantCache.add(newInstant);
        if (this.instantCache.size() > 3) {
            String instant = this.instantCache.get(0);
            boolean[] error = new boolean[1];
            CkpMessage.getAllFileNames(instant).stream().map(this::fullPath).forEach(path -> {
                try {
                    this.fs.delete(path, false);
                }
                catch (IOException e) {
                    error[0] = true;
                    LOG.warn("Exception while cleaning the checkpoint meta file: " + path);
                }
            });
            if (!error[0]) {
                this.instantCache.remove(0);
            }
        }
    }

    public void commitInstant(String instant) {
        Path path = this.fullPath(CkpMessage.getFileName(instant, CkpMessage.State.COMPLETED));
        try {
            this.fs.createNewFile(path);
        }
        catch (IOException e) {
            throw new HoodieException("Exception while adding checkpoint commit metadata for instant: " + instant, e);
        }
    }

    public void abortInstant(String instant) {
        Path path = this.fullPath(CkpMessage.getFileName(instant, CkpMessage.State.ABORTED));
        try {
            this.fs.createNewFile(path);
        }
        catch (IOException e) {
            throw new HoodieException("Exception while adding checkpoint abort metadata for instant: " + instant);
        }
    }

    private void load() {
        try {
            this.messages = this.scanCkpMetadata(this.path);
        }
        catch (IOException e) {
            throw new HoodieException("Exception while scanning the checkpoint meta files under path: " + this.path, e);
        }
    }

    @Nullable
    public String lastPendingInstant() {
        this.load();
        for (int i = this.messages.size() - 1; i >= 0; --i) {
            CkpMessage ckpMsg = this.messages.get(i);
            if (ckpMsg.isComplete()) continue;
            return ckpMsg.getInstant();
        }
        return null;
    }

    public List<CkpMessage> getMessages() {
        this.load();
        return this.messages;
    }

    public boolean isAborted(String instant) {
        ValidationUtils.checkState(this.messages != null, "The checkpoint metadata should #load first");
        return this.messages.stream().anyMatch(ckpMsg -> instant.equals(ckpMsg.getInstant()) && ckpMsg.isAborted());
    }

    public static CkpMetadata getInstance(Configuration config) {
        return new CkpMetadata(config);
    }

    public static CkpMetadata getInstance(FileSystem fs, String basePath) {
        return new CkpMetadata(fs, basePath);
    }

    protected static String ckpMetaPath(String basePath) {
        return basePath + "/" + ".hoodie/.aux" + "/" + CKP_META;
    }

    private Path fullPath(String fileName) {
        return new Path(this.path, fileName);
    }

    private List<CkpMessage> scanCkpMetadata(Path ckpMetaPath) throws IOException {
        return Arrays.stream(this.fs.listStatus(ckpMetaPath)).map(CkpMessage::new).collect(Collectors.groupingBy(CkpMessage::getInstant)).values().stream().map(messages -> (CkpMessage)messages.stream().reduce((x, y) -> {
            if (x.getState().compareTo(y.getState()) >= 0) {
                return x;
            }
            return y;
        }).get()).sorted().collect(Collectors.toList());
    }
}

