/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.util;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieHeartbeatException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientIds
implements AutoCloseable,
Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(ClientIds.class);
    private static final String HEARTBEAT_FOLDER_NAME = ".ids";
    private static final String HEARTBEAT_FILE_NAME_PREFIX = "_";
    public static final String INIT_CLIENT_ID = "";
    public static final long DEFAULT_HEARTBEAT_INTERVAL_IN_MS = 60000L;
    public static final int DEFAULT_NUM_TOLERABLE_HEARTBEAT_MISSES = 5;
    private final transient FileSystem fs;
    private final Path heartbeatFilePath;
    private final long heartbeatIntervalInMs;
    private final long heartbeatTimeoutThresholdInMs;
    private ScheduledExecutorService executor;
    private boolean started;

    private ClientIds(FileSystem fs, String basePath, String uniqueId, long heartbeatIntervalInMs, int numTolerableHeartbeatMisses) {
        this.fs = fs;
        this.heartbeatFilePath = this.getHeartbeatFilePath(basePath, uniqueId);
        this.heartbeatIntervalInMs = heartbeatIntervalInMs;
        this.heartbeatTimeoutThresholdInMs = (long)numTolerableHeartbeatMisses * heartbeatIntervalInMs;
    }

    public void start() {
        if (this.started) {
            LOG.info("The service heartbeat client is already started, skips the action");
        }
        this.updateHeartbeat();
        this.executor = Executors.newScheduledThreadPool(1);
        this.executor.scheduleAtFixedRate(this::updateHeartbeat, this.heartbeatIntervalInMs, this.heartbeatIntervalInMs, TimeUnit.MILLISECONDS);
        this.started = true;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    public void close() {
        if (this.executor != null) {
            this.executor.shutdownNow();
            this.executor = null;
        }
        this.started = false;
    }

    public static boolean isHeartbeatExpired(FileSystem fs, Path path, long timeoutThreshold) {
        try {
            if (fs.exists(path)) {
                long modifyTime = fs.getFileStatus(path).getModificationTime();
                long currentTime = System.currentTimeMillis();
                return currentTime - modifyTime > timeoutThreshold;
            }
        }
        catch (IOException e) {
            LOG.error("Check heartbeat file existence error: " + path);
        }
        return false;
    }

    private String getHeartbeatFolderPath(String basePath) {
        return basePath + "/" + ".hoodie/.aux" + "/" + HEARTBEAT_FOLDER_NAME;
    }

    private Path getHeartbeatFilePath(String basePath, String uniqueId) {
        String heartbeatFolderPath = this.getHeartbeatFolderPath(basePath);
        String fileName = StringUtils.isNullOrEmpty(uniqueId) ? HEARTBEAT_FILE_NAME_PREFIX : HEARTBEAT_FILE_NAME_PREFIX + uniqueId;
        return new Path(heartbeatFolderPath, fileName);
    }

    private void updateHeartbeat() throws HoodieHeartbeatException {
        this.updateHeartbeat(this.heartbeatFilePath);
    }

    private void updateHeartbeat(Path heartbeatFilePath) throws HoodieHeartbeatException {
        try {
            FSDataOutputStream outputStream = this.fs.create(heartbeatFilePath, true);
            Throwable throwable = null;
            if (outputStream != null) {
                if (throwable != null) {
                    try {
                        outputStream.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    outputStream.close();
                }
            }
        }
        catch (IOException io) {
            throw new HoodieHeartbeatException("Unable to generate heartbeat for file path " + heartbeatFilePath, io);
        }
    }

    @VisibleForTesting
    public String nextId(Configuration conf) {
        String basePath = conf.getString(FlinkOptions.PATH);
        String nextId = this.nextId(conf, basePath);
        this.updateHeartbeat(this.getHeartbeatFilePath(basePath, nextId));
        return nextId;
    }

    private String nextId(Configuration conf, String basePath) {
        Path heartbeatFolderPath = new Path(this.getHeartbeatFolderPath(basePath));
        FileSystem fs = FSUtils.getFs(heartbeatFolderPath, HadoopConfigurations.getHadoopConf(conf));
        try {
            if (!fs.exists(heartbeatFolderPath)) {
                return INIT_CLIENT_ID;
            }
            List sortedPaths = Arrays.stream(fs.listStatus(heartbeatFolderPath)).map(FileStatus::getPath).sorted(Comparator.comparing(Path::getName)).collect(Collectors.toList());
            if (sortedPaths.isEmpty()) {
                return INIT_CLIENT_ID;
            }
            List zombieHeartbeatPaths = sortedPaths.stream().filter(path -> ClientIds.isHeartbeatExpired(fs, path, this.heartbeatTimeoutThresholdInMs)).collect(Collectors.toList());
            if (!zombieHeartbeatPaths.isEmpty()) {
                for (Path path2 : zombieHeartbeatPaths) {
                    fs.delete(path2, true);
                    LOG.warn("Delete inactive ckp metadata path: " + path2);
                }
                return ClientIds.getClientId((Path)zombieHeartbeatPaths.get(0));
            }
            String largestClientId = ClientIds.getClientId((Path)sortedPaths.get(sortedPaths.size() - 1));
            return INIT_CLIENT_ID.equals(largestClientId) ? "1" : Integer.parseInt(largestClientId) + 1 + INIT_CLIENT_ID;
        }
        catch (IOException e) {
            throw new RuntimeException("Generate next client id error", e);
        }
    }

    private static String getClientId(Path path) {
        String[] splits = path.getName().split(HEARTBEAT_FILE_NAME_PREFIX);
        return splits.length > 1 ? splits[1] : INIT_CLIENT_ID;
    }

    public static class Builder {
        private FileSystem fs;
        private String basePath;
        private String clientId = "";
        private long heartbeatIntervalInMs = 60000L;
        private int numTolerableHeartbeatMisses = 5;

        public Builder fs(FileSystem fs) {
            this.fs = fs;
            return this;
        }

        public Builder basePath(String basePath) {
            this.basePath = basePath;
            return this;
        }

        public Builder clientId(String clientId) {
            this.clientId = clientId;
            return this;
        }

        public Builder conf(Configuration conf) {
            this.basePath = conf.getString(FlinkOptions.PATH);
            this.fs = FSUtils.getFs(this.basePath, HadoopConfigurations.getHadoopConf(conf));
            this.clientId = conf.getString(FlinkOptions.WRITE_CLIENT_ID);
            return this;
        }

        public Builder heartbeatIntervalInMs(long interval) {
            this.heartbeatIntervalInMs = interval;
            return this;
        }

        public Builder numTolerableHeartbeatMisses(int numMisses) {
            this.numTolerableHeartbeatMisses = numMisses;
            return this;
        }

        public ClientIds build() {
            return new ClientIds(Objects.requireNonNull(this.fs), Objects.requireNonNull(this.basePath), this.clientId, this.heartbeatIntervalInMs, this.numTolerableHeartbeatMisses);
        }
    }
}

