/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.store.rocksdb;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.StateConfigKeys;
import com.antgroup.geaflow.common.errorcode.RuntimeErrors;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.thread.Executors;
import com.antgroup.geaflow.common.tuple.Tuple;
import com.antgroup.geaflow.file.FileConfigKeys;
import com.antgroup.geaflow.file.FileInfo;
import com.antgroup.geaflow.file.IPersistentIO;
import com.antgroup.geaflow.file.PersistentIOBuilder;
import com.antgroup.geaflow.store.rocksdb.RocksdbConfigKeys;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksdbPersistClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(RocksdbPersistClient.class);
    private static final String COMMIT_TAG_FILE = "_commit";
    private static final String FILES = "FILES";
    private static final int DELETE_CAPACITY = 64;
    private static final String DATAS = "datas";
    private static final String META = "meta";
    private static final String FILE_SEPARATOR = ",";
    private static final String SST_SUFFIX = "sst";
    private final Long persistTimeout;
    private final IPersistentIO persistIO;
    private final NavigableMap<Long, CheckPointFileInfo> checkPointFileInfo;
    private final ExecutorService copyFileService;
    private final ExecutorService deleteFileService;
    private final ExecutorService backgroundDeleteService = new ThreadPoolExecutor(1, 1, 300L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), (ThreadFactory)new BasicThreadFactory.Builder().namingPattern("asyncDeletes-%d").daemon(true).build(), new ThreadPoolExecutor.DiscardOldestPolicy());

    public RocksdbPersistClient(Configuration configuration) {
        this.persistIO = PersistentIOBuilder.build((Configuration)configuration);
        this.checkPointFileInfo = new ConcurrentSkipListMap<Long, CheckPointFileInfo>();
        int persistThreadNum = configuration.getInteger(FileConfigKeys.PERSISTENT_THREAD_SIZE);
        int persistCleanThreadNum = configuration.getInteger(RocksdbConfigKeys.ROCKSDB_PERSISTENT_CLEAN_THREAD_SIZE);
        this.persistTimeout = configuration.getInteger(StateConfigKeys.STATE_ROCKSDB_PERSIST_TIMEOUT_SECONDS);
        this.copyFileService = Executors.getExecutorService((int)1, (int)persistThreadNum, (String)"persist-%d");
        this.deleteFileService = Executors.getService((int)persistCleanThreadNum, (int)64, (long)300L, (TimeUnit)TimeUnit.SECONDS);
        ((ThreadPoolExecutor)this.deleteFileService).setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
    }

    public void clearFileInfo() {
        this.checkPointFileInfo.clear();
    }

    public long getSstIndex(String filename) {
        try {
            return Long.parseLong(filename.substring(0, filename.indexOf(46)));
        }
        catch (Throwable ignore) {
            LOGGER.warn("filename {} is abnormal", (Object)filename);
            return 0L;
        }
    }

    private long getMetaFileId(String fileName) {
        return Long.parseLong(fileName.substring(fileName.indexOf(46) + 1));
    }

    private static String getMetaFileName(long chkId) {
        return "meta." + chkId;
    }

    public void archive(long chkId, String localChkPath, String remotePath, long keepCheckpointNum) throws Exception {
        Set<String> lastFullFiles = this.getLastFullFiles(chkId, localChkPath, remotePath);
        CheckPointFileInfo currentFileInfo = new CheckPointFileInfo(chkId);
        ArrayList callers = new ArrayList();
        File localChkFile = new File(localChkPath);
        Object[] sstFileNames = localChkFile.list((dir, name) -> name.endsWith(SST_SUFFIX));
        FileUtils.write((File)FileUtils.getFile((File)localChkFile, (String[])new String[]{FILES}), (CharSequence)Joiner.on((String)FILE_SEPARATOR).join(sstFileNames), (Charset)Charset.defaultCharset());
        long size = 0L;
        String dataPath = Paths.get(remotePath, DATAS).toString();
        for (Object subFileName : sstFileNames) {
            currentFileInfo.addFullFile((String)subFileName);
            if (lastFullFiles.contains(subFileName)) continue;
            currentFileInfo.addIncDataFile((String)subFileName);
            File tmp = FileUtils.getFile((File)localChkFile, (String[])new String[]{subFileName});
            callers.add(this.copyFromLocal(new Path(tmp.getAbsolutePath()), new Path(dataPath, (String)subFileName), tmp.length()));
            size += tmp.length();
        }
        String[] metaFileNames = localChkFile.list((dir, name) -> !name.endsWith(SST_SUFFIX));
        String metaPath = Paths.get(remotePath, RocksdbPersistClient.getMetaFileName(chkId)).toString();
        for (String metaFileName : metaFileNames) {
            File tmp = FileUtils.getFile((File)localChkFile, (String[])new String[]{metaFileName});
            callers.add(this.copyFromLocal(new Path(tmp.getAbsolutePath()), new Path(metaPath, metaFileName), tmp.length()));
            size += tmp.length();
        }
        LOGGER.info("checkpointId {}, full {}, lastFullFiles {}, currentIncre {}", new Object[]{chkId, Arrays.toString(sstFileNames), lastFullFiles, currentFileInfo.getIncDataFiles()});
        long startTime = System.nanoTime();
        this.completeHandler(callers, this.copyFileService);
        callers.clear();
        this.persistIO.createNewFile(new Path(metaPath, COMMIT_TAG_FILE));
        double costMs = (double)(System.nanoTime() - startTime) / 1000000.0;
        LOGGER.info("RocksDB {} archive local:{} to {} (incre[{}]/full[{}]) took {}ms. incre data size {}KB, speed {}KB/s {}", new Object[]{this.persistIO.getPersistentType(), localChkFile.getAbsolutePath(), remotePath, currentFileInfo.getIncDataFiles().size(), currentFileInfo.getFullDataFiles().size(), costMs, size / 1024L, (double)(size * 1000L) / (1024.0 * costMs), currentFileInfo.getIncDataFiles().toString()});
        this.checkPointFileInfo.put(chkId, currentFileInfo);
        this.backgroundDeleteService.execute(() -> this.cleanLocalAndRemoteFiles(chkId, remotePath, keepCheckpointNum, localChkFile));
    }

    public long getLatestCheckpointId(String remotePathStr) {
        try {
            if (!this.persistIO.exists(new Path(remotePathStr))) {
                return -1L;
            }
            List files = this.persistIO.listFile(new Path(remotePathStr));
            List chkIds = files.stream().filter(f -> f.startsWith(META)).map(this::getMetaFileId).filter(f -> f > 0L).sorted(Collections.reverseOrder()).collect(Collectors.toList());
            LOGGER.info("find available chk {}", chkIds);
            for (Long chkId : chkIds) {
                String path = Paths.get(remotePathStr, RocksdbPersistClient.getMetaFileName(chkId), COMMIT_TAG_FILE).toString();
                if (this.persistIO.exists(new Path(path))) {
                    return chkId;
                }
                LOGGER.info("chk {} has no path {}", (Object)chkId, (Object)path);
            }
        }
        catch (IOException e) {
            throw new GeaflowRuntimeException(RuntimeErrors.INST.stateRocksDbError("recover fail"), (Throwable)e);
        }
        return -1L;
    }

    public void recover(long chkId, String localRdbPath, String localChkPath, String remotePathStr) throws Exception {
        this.checkPointFileInfo.clear();
        File rocksDBChkFile = new File(localChkPath);
        File rocksDBFile = new File(localRdbPath);
        LOGGER.info("delete {} {}", (Object)localChkPath, (Object)localRdbPath);
        FileUtils.deleteQuietly((File)rocksDBChkFile);
        FileUtils.deleteQuietly((File)rocksDBFile);
        rocksDBChkFile.mkdirs();
        rocksDBFile.mkdirs();
        long startTime = System.currentTimeMillis();
        Path remotePath = new Path(remotePathStr);
        if (!this.persistIO.exists(remotePath)) {
            String msg = String.format("checkPoint: %s is not exist in remote", remotePath);
            LOGGER.warn(msg);
            throw new GeaflowRuntimeException(RuntimeErrors.INST.stateRocksDbError(msg));
        }
        String remoteMeta = Paths.get(remotePathStr, RocksdbPersistClient.getMetaFileName(chkId)).toString();
        InputStream in = this.persistIO.open(new Path(remoteMeta, FILES));
        String sstString = IOUtils.toString((InputStream)in, (Charset)Charset.defaultCharset());
        List list = Splitter.on((String)FILE_SEPARATOR).omitEmptyStrings().splitToList((CharSequence)sstString);
        CheckPointFileInfo commitedInfo = new CheckPointFileInfo(chkId);
        this.recoveryData(remotePath, rocksDBChkFile, commitedInfo, list, remoteMeta);
        LOGGER.info("recoveryFromRemote {} cost {}ms", (Object)remotePath, (Object)(System.currentTimeMillis() - startTime));
        this.checkPointFileInfo.put(chkId, commitedInfo);
        for (File file : rocksDBChkFile.listFiles()) {
            Files.createLink(FileSystems.getDefault().getPath(localRdbPath, file.getName()), file.toPath());
        }
        this.backgroundDeleteService.execute(() -> RocksdbPersistClient.cleanLocalChk(chkId, new File(localChkPath)));
    }

    private static void cleanLocalChk(long chkId, File localChkFile) {
        File[] subFiles;
        String chkPrefix = RocksdbConfigKeys.getChkPathPrefix(localChkFile.getName());
        FilenameFilter filter = (dir, name) -> {
            if (RocksdbConfigKeys.isChkPath(name) && name.startsWith(chkPrefix)) {
                return chkId > RocksdbConfigKeys.getChkIdFromChkPath(name);
            }
            return false;
        };
        for (File path : subFiles = localChkFile.getParentFile().listFiles(filter)) {
            LOGGER.info("delete local chk {}", (Object)path.toURI());
            FileUtils.deleteQuietly((File)path);
        }
    }

    private Set<String> getLastFullFiles(long chkId, String localChkPath, String remotePath) throws IOException {
        CheckPointFileInfo commitFileInfo = (CheckPointFileInfo)this.checkPointFileInfo.get(chkId);
        if (commitFileInfo == null) {
            Map.Entry<Long, CheckPointFileInfo> info = this.checkPointFileInfo.lowerEntry(chkId);
            if (info != null) {
                commitFileInfo = info.getValue();
            } else {
                FileInfo[] metaFileStatuses;
                Path lastMetaPath;
                Path path = new Path(remotePath);
                PathFilter filter = path1 -> path1.getName().startsWith(META);
                if (this.persistIO.exists(path) && (lastMetaPath = this.getLastMetaFile(chkId, metaFileStatuses = this.persistIO.listStatus(path, filter))) != null) {
                    commitFileInfo = new CheckPointFileInfo(chkId);
                    commitFileInfo.addFullFiles(this.getKeptFileName(lastMetaPath));
                }
            }
        }
        HashSet<Object> lastFullFiles = commitFileInfo != null ? new HashSet<String>(commitFileInfo.getFullDataFiles()) : new HashSet();
        File file = new File(localChkPath);
        String[] curNames = file.list();
        Preconditions.checkNotNull((Object)curNames, (Object)(localChkPath + " is null"));
        Optional<Long> chkLargestSst = Arrays.stream(curNames).filter(c -> c.endsWith(SST_SUFFIX)).map(this::getSstIndex).max(Long::compareTo);
        Optional<Long> lastLargestSst = lastFullFiles.stream().filter(c -> c.endsWith(SST_SUFFIX)).map(this::getSstIndex).max(Long::compareTo);
        if (chkLargestSst.isPresent() && lastLargestSst.isPresent()) {
            Preconditions.checkArgument((chkLargestSst.get().compareTo(lastLargestSst.get()) >= 0 ? 1 : 0) != 0, (String)"%s < %s, chk path %s, check FO and recovery.", (Object)chkLargestSst.get(), (Object)lastLargestSst.get(), (Object)localChkPath);
        }
        return lastFullFiles;
    }

    private void cleanLocalAndRemoteFiles(long chkId, String remotePath, long keepCheckpointNum, File localChkFile) {
        Long key;
        try {
            this.removeEarlyChk(remotePath, chkId - keepCheckpointNum);
        }
        catch (IOException ignore) {
            LOGGER.warn("remove Early chk fail and ignore {}, chkId {}, keepChkNum {}", new Object[]{remotePath, chkId, keepCheckpointNum});
        }
        while ((key = this.checkPointFileInfo.lowerKey(chkId)) != null) {
            this.checkPointFileInfo.remove(key);
        }
        RocksdbPersistClient.cleanLocalChk(chkId, localChkFile);
    }

    private void removeEarlyChk(String remotePath, long chkId) throws IOException {
        long start = System.currentTimeMillis();
        LOGGER.info("skip remove early chk {} {}", (Object)remotePath, (Object)chkId);
        FileInfo[] sstFileStatuses = new FileInfo[]{};
        try {
            sstFileStatuses = this.persistIO.listStatus(new Path(remotePath, DATAS));
        }
        catch (Exception e) {
            LOGGER.warn("{} do not have data, just ignore", (Object)remotePath);
        }
        Path path = new Path(remotePath);
        PathFilter filter = path1 -> path1.getName().startsWith(META);
        FileInfo[] metaFileStatuses = this.persistIO.listStatus(path, filter);
        Path delMetaPath = this.getLastMetaFile(chkId, metaFileStatuses);
        if (delMetaPath == null) {
            return;
        }
        Set<String> toBeKepts = this.getKeptFileName(delMetaPath);
        if (toBeKepts.size() == 0) {
            return;
        }
        long chkPointTime = this.persistIO.getFileInfo(new Path(delMetaPath, COMMIT_TAG_FILE)).getModificationTime();
        LOGGER.info("remotePath {}, chkId: {}, chkPointTime {}, toBeKepts: {}", new Object[]{remotePath, chkId, new Date(chkPointTime), toBeKepts});
        List<Path> paths = this.getDelPaths(chkId, chkPointTime, sstFileStatuses, metaFileStatuses, toBeKepts);
        LOGGER.info("RocksDB({}) clean dfs checkpoint: ({}) took {}ms", new Object[]{chkId, paths.stream().map(Path::getName).collect(Collectors.joining(FILE_SEPARATOR)), System.currentTimeMillis() - start});
        this.asyncDeletes(paths);
    }

    private List<Path> getDelPaths(long chkId, long chkPointTime, FileInfo[] sstFileStatuses, FileInfo[] metaFileStatuses, Set<String> toBeKepts) {
        HashSet<String> toBeDels = new HashSet<String>();
        ArrayList paths = Lists.newArrayList();
        for (FileInfo fileStatus : sstFileStatuses) {
            if (fileStatus.getModificationTime() >= chkPointTime || toBeKepts.contains(fileStatus.getPath().getName())) continue;
            toBeDels.add(fileStatus.getPath().getName());
            paths.add(fileStatus.getPath());
            LOGGER.info("delete file: {} time: {}", (Object)fileStatus.getPath(), (Object)new Date(fileStatus.getModificationTime()));
        }
        LOGGER.info("kepts: {}, dels: {} ", toBeKepts, toBeDels);
        for (FileInfo fileStatus : metaFileStatuses) {
            long chkVersion = this.getChkVersion(fileStatus.getPath().getName());
            if (chkVersion >= chkId) continue;
            paths.add(fileStatus.getPath());
        }
        return paths;
    }

    private Path getLastMetaFile(long chkId, FileInfo[] metaFileStatuses) {
        int maxMetaVersion = 0;
        FileInfo fileInfo = null;
        for (FileInfo fileStatus : metaFileStatuses) {
            int metaVersion = this.getChkVersion(fileStatus.getPath().getName());
            if ((long)metaVersion >= chkId || metaVersion <= maxMetaVersion) continue;
            maxMetaVersion = metaVersion;
            fileInfo = fileStatus;
        }
        if (maxMetaVersion == 0) {
            return null;
        }
        return fileInfo.getPath();
    }

    private Set<String> getKeptFileName(Path metaPath) throws IOException {
        Path filesPath = new Path(metaPath, FILES);
        InputStream in = this.persistIO.open(filesPath);
        String sstString = IOUtils.toString((InputStream)in, (Charset)Charset.defaultCharset());
        return Sets.newHashSet((Iterable)Splitter.on((String)FILE_SEPARATOR).split((CharSequence)sstString));
    }

    private int getChkVersion(String filename) {
        return Integer.parseInt(filename.substring(filename.indexOf(46) + 1));
    }

    private <T> List<T> completeHandler(List<Callable<T>> callers, ExecutorService executorService) {
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>();
        ArrayList results = new ArrayList();
        for (Callable<T> callable : callers) {
            futures.add(executorService.submit(callable));
        }
        try {
            for (Future future : futures) {
                results.add(future.get(this.persistTimeout, TimeUnit.SECONDS));
            }
        }
        catch (Exception e) {
            throw new GeaflowRuntimeException(RuntimeErrors.INST.stateRocksDbError("persist time out or other exceptions"), (Throwable)e);
        }
        return results;
    }

    private Tuple<Boolean, Long> checkSizeSame(Path dfsPath, Path localPath) throws IOException {
        File localFile;
        long len = this.persistIO.getRemoteFileSize(dfsPath);
        return Tuple.of((Object)(len == (localFile = new File(localPath.toString())).length() ? 1 : 0), (Object)len);
    }

    private Callable<Long> copyFromLocal(Path from, Path to, long size) {
        return () -> {
            Tuple<Boolean, Long> checkRes;
            int count = 0;
            int maxTries = 3;
            while (true) {
                try {
                    long start;
                    block4: {
                        do {
                            start = System.currentTimeMillis();
                            this.persistIO.copyFromLocalFile(from, to);
                            checkRes = this.checkSizeSame(to, from);
                            if (((Boolean)checkRes.f0).booleanValue()) break block4;
                            LOGGER.warn("upload to dfs size not same {} -> {}", (Object)from, (Object)to);
                        } while (++count != maxTries);
                        throw new GeaflowRuntimeException(RuntimeErrors.INST.stateRocksDbError("upload to dfs size not same"));
                    }
                    LOGGER.info("upload to dfs size {}KB took {}ms {} -> {}", new Object[]{size / 1024L, System.currentTimeMillis() - start, from, to});
                }
                catch (IOException ex) {
                    if (++count != maxTries) continue;
                    throw new GeaflowRuntimeException(RuntimeErrors.INST.stateRocksDbError("upload to dfs exception"), (Throwable)ex);
                }
                break;
            }
            return (Long)checkRes.f1;
        };
    }

    private Callable<Long> copyToLocal(Path from, Path to) {
        return () -> {
            Tuple<Boolean, Long> checkRes;
            int count = 0;
            int maxTries = 3;
            while (true) {
                try {
                    block4: {
                        do {
                            this.persistIO.copyToLocalFile(from, to);
                            checkRes = this.checkSizeSame(from, to);
                            if (((Boolean)checkRes.f0).booleanValue()) break block4;
                            LOGGER.warn("download from dfs size not same {} -> {}", (Object)from, (Object)to);
                        } while (++count != maxTries);
                        String msg = "download from dfs size not same: " + from;
                        throw new GeaflowRuntimeException(RuntimeErrors.INST.stateRocksDbError(msg));
                    }
                    LOGGER.info("download from dfs {} -> {}", (Object)from, (Object)to);
                }
                catch (IOException ex) {
                    if (++count != maxTries) continue;
                    throw new GeaflowRuntimeException(RuntimeErrors.INST.stateRocksDbError("copy from dfs exception"), (Throwable)ex);
                }
                break;
            }
            return (Long)checkRes.f1;
        };
    }

    private void asyncDeletes(List<Path> paths) {
        this.deleteFileService.execute(() -> {
            long start = System.currentTimeMillis();
            for (Path path : paths) {
                try {
                    long s = System.nanoTime();
                    this.persistIO.delete(path, true);
                    LOGGER.info("async Delete path {} cost {}us", (Object)path, (Object)((System.nanoTime() - s) / 1000L));
                }
                catch (IOException e) {
                    LOGGER.warn("delete fail", (Throwable)e);
                }
            }
            LOGGER.info("asyncDeletes path {} cost {}ms", (Object)paths, (Object)(System.currentTimeMillis() - start));
        });
    }

    private long recoveryData(Path remotePath, File localChkFile, CheckPointFileInfo committedInfo, List<String> list, String remoteMeta) throws Exception {
        LOGGER.info("recoveryData {} list {}", (Object)remotePath, list);
        ArrayList callers = new ArrayList();
        for (String string : list) {
            callers.add(this.copyToLocal(new Path(Paths.get(remotePath.toString(), DATAS, string).toString()), new Path(localChkFile.getAbsolutePath(), string)));
        }
        List metaList = this.persistIO.listFile(new Path(remoteMeta));
        for (String metaName : metaList) {
            callers.add(this.copyToLocal(new Path(remoteMeta, metaName), new Path(localChkFile.getAbsolutePath(), metaName)));
        }
        long l = System.currentTimeMillis();
        List res = this.completeHandler(callers, this.copyFileService);
        long size = res.stream().mapToLong(i -> i).sum() / 1024L;
        long speed = 1000L * size / (System.currentTimeMillis() - l + 1L);
        LOGGER.info("RocksDB {} copy ({} to local:{}) lastCommitInfo:{}. size: {}KB, speed: {}KB/s", new Object[]{this.persistIO.getPersistentType(), remotePath, localChkFile, committedInfo, size / 1024L, speed});
        String[] localChkFiles = localChkFile.list((dir, name) -> name.endsWith(SST_SUFFIX));
        if (localChkFiles != null) {
            for (String chkFile : localChkFiles) {
                committedInfo.addFullFile(chkFile);
            }
        } else {
            Preconditions.checkArgument((list.size() == 0 ? 1 : 0) != 0, (Object)"sst is not fetched.");
        }
        return size;
    }

    public static class CheckPointFileInfo {
        private long checkPointId;
        private Set<String> incDataFiles = new HashSet<String>();
        private Set<String> fullDataFiles = new HashSet<String>();

        public CheckPointFileInfo(long checkPointId) {
            this.checkPointId = checkPointId;
        }

        public long getCheckPointId() {
            return this.checkPointId;
        }

        public void addIncDataFile(String name) {
            this.incDataFiles.add(name);
        }

        public void addFullFile(String name) {
            this.fullDataFiles.add(name);
        }

        public void addFullFiles(Collection<String> name) {
            this.fullDataFiles.addAll(name);
        }

        public String toString() {
            return String.format("CheckPointFileInfo [checkPointId=%d, incDataFiles=%s, fullDataFiles=%s]", this.checkPointId, this.incDataFiles, this.fullDataFiles);
        }

        public Set<String> getIncDataFiles() {
            return this.incDataFiles;
        }

        public Set<String> getFullDataFiles() {
            return this.fullDataFiles;
        }
    }
}

