/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.compact.changelog;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChangelogCompactTask
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogCompactTask.class);
    private final long checkpointId;
    private final BinaryRow partition;
    private final int totalBuckets;
    private final Map<Integer, List<DataFileMeta>> newFileChangelogFiles;
    private final Map<Integer, List<DataFileMeta>> compactChangelogFiles;

    public ChangelogCompactTask(long checkpointId, BinaryRow partition, int totalBuckets, Map<Integer, List<DataFileMeta>> newFileChangelogFiles, Map<Integer, List<DataFileMeta>> compactChangelogFiles) {
        this.checkpointId = checkpointId;
        this.partition = partition;
        this.totalBuckets = totalBuckets;
        this.newFileChangelogFiles = newFileChangelogFiles;
        this.compactChangelogFiles = compactChangelogFiles;
        Preconditions.checkArgument(newFileChangelogFiles.isEmpty() || compactChangelogFiles.isEmpty(), "Both newFileChangelogFiles and compactChangelogFiles are not empty. There is no such table where changelog is produced both from new files and from compaction. This is unexpected.");
    }

    public long checkpointId() {
        return this.checkpointId;
    }

    public BinaryRow partition() {
        return this.partition;
    }

    public int totalBuckets() {
        return this.totalBuckets;
    }

    public Map<Integer, List<DataFileMeta>> newFileChangelogFiles() {
        return this.newFileChangelogFiles;
    }

    public Map<Integer, List<DataFileMeta>> compactChangelogFiles() {
        return this.compactChangelogFiles;
    }

    public List<Committable> doCompact(FileStoreTable table, ExecutorService executor, MemorySize bufferSize) throws Exception {
        Preconditions.checkArgument(bufferSize.getBytes() <= Integer.MAX_VALUE, "Changelog pre-commit compaction buffer size ({} bytes) too large! The maximum possible value is {} bytes.", bufferSize.getBytes(), Integer.MAX_VALUE);
        FileStorePathFactory pathFactory = table.store().pathFactory();
        ArrayList tasks = new ArrayList();
        BiConsumer<Map, Boolean> addTasks = (files, isCompactResult) -> {
            for (Map.Entry entry : files.entrySet()) {
                int bucket = (Integer)entry.getKey();
                DataFilePathFactory dataFilePathFactory = pathFactory.createDataFilePathFactory(this.partition, bucket);
                for (DataFileMeta meta : (List)entry.getValue()) {
                    ReadTask task = new ReadTask(table, dataFilePathFactory.toPath(meta), bucket, (boolean)isCompactResult, meta);
                    Preconditions.checkArgument(meta.fileSize() <= bufferSize.getBytes(), "Trying to compact changelog file with size {} bytes, while the buffer size is only {} bytes. This is unexpected.", meta.fileSize(), bufferSize.getBytes());
                    tasks.add(task);
                }
            }
        };
        addTasks.accept(this.newFileChangelogFiles, false);
        addTasks.accept(this.compactChangelogFiles, true);
        Semaphore semaphore = new Semaphore((int)bufferSize.getBytes());
        LinkedBlockingQueue finishedTasks = new LinkedBlockingQueue();
        ThreadPoolUtils.submitAllTasks(executor, t -> {
            try {
                semaphore.acquire((int)((ReadTask)t).meta.fileSize());
                ((ReadTask)t).readFully();
                finishedTasks.put(t);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }, tasks);
        OutputStream outputStream = new OutputStream();
        ArrayList<Result> results = new ArrayList<Result>();
        for (int i = 0; i < tasks.size(); ++i) {
            ReadTask task = (ReadTask)finishedTasks.take();
            if (task.exception != null) {
                throw task.exception;
            }
            this.write(task, outputStream, results);
            semaphore.release((int)task.meta.fileSize());
        }
        outputStream.out.close();
        return this.produceNewCommittables(results, table, pathFactory, outputStream.path);
    }

    private void write(ReadTask task, OutputStream outputStream, List<Result> results) throws Exception {
        if (!outputStream.isInitialized) {
            Path outputPath = new Path(task.path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID());
            outputStream.init(outputPath, task.table.fileIO().newOutputStream(outputPath, false));
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Copying bytes from {} to {}", (Object)task.path, (Object)outputStream.path);
        }
        long offset = outputStream.out.getPos();
        outputStream.out.write(task.result);
        results.add(new Result(task.bucket, task.isCompactResult, task.meta, offset, outputStream.out.getPos() - offset));
    }

    private List<Committable> produceNewCommittables(List<Result> results, FileStoreTable table, FileStorePathFactory pathFactory, Path changelogTempPath) throws IOException {
        Result baseResult = results.get(0);
        Preconditions.checkArgument(baseResult.offset == 0L);
        DataFilePathFactory dataFilePathFactory = pathFactory.createDataFilePathFactory(this.partition, baseResult.bucket);
        String realName = "compacted-changelog-" + UUID.randomUUID() + "$" + baseResult.bucket + "-" + baseResult.length;
        Path realPath = dataFilePathFactory.toAlignedPath(realName + "." + CompactedChangelogReadOnlyFormat.getIdentifier(baseResult.meta.fileFormat()), baseResult.meta);
        Path realExternalDir = baseResult.meta.externalPath().map(p -> new Path((String)p).getParent()).orElse(null);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Rename {} to {} with external dir {}", new Object[]{changelogTempPath, realPath, realExternalDir});
        }
        table.fileIO().rename(changelogTempPath, realPath);
        HashMap<Integer, List> bucketedResults = new HashMap<Integer, List>();
        for (Result result : results) {
            bucketedResults.computeIfAbsent(result.bucket, b -> new ArrayList()).add(result);
        }
        ArrayList<Committable> newCommittables = new ArrayList<Committable>();
        for (Map.Entry entry : bucketedResults.entrySet()) {
            ArrayList<DataFileMeta> newFilesChangelog = new ArrayList<DataFileMeta>();
            ArrayList<DataFileMeta> compactChangelog = new ArrayList<DataFileMeta>();
            for (Result result : (List)entry.getValue()) {
                String name = (result.offset == 0L ? realName : realName + "-" + result.offset + "-" + result.length) + "." + CompactedChangelogReadOnlyFormat.getIdentifier(result.meta.fileFormat());
                DataFileMeta file = result.meta.rename(name);
                if (realExternalDir != null) {
                    file = file.newExternalPath(new Path(realExternalDir, name).toString());
                }
                if (result.isCompactResult) {
                    compactChangelog.add(file);
                    continue;
                }
                newFilesChangelog.add(file);
            }
            CommitMessageImpl newMessage = new CommitMessageImpl(this.partition, (Integer)entry.getKey(), this.totalBuckets, new DataIncrement(Collections.emptyList(), Collections.emptyList(), newFilesChangelog), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), compactChangelog));
            newCommittables.add(new Committable(this.checkpointId, Committable.Kind.FILE, newMessage));
        }
        return newCommittables;
    }

    public int hashCode() {
        return Objects.hash(this.checkpointId, this.partition, this.newFileChangelogFiles, this.compactChangelogFiles);
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        ChangelogCompactTask that = (ChangelogCompactTask)o;
        return this.checkpointId == that.checkpointId && Objects.equals(this.partition, that.partition) && Objects.equals(this.newFileChangelogFiles, that.newFileChangelogFiles) && Objects.equals(this.compactChangelogFiles, that.compactChangelogFiles);
    }

    public String toString() {
        return String.format("ChangelogCompactionTask {partition = %s, newFileChangelogFiles = %s, compactChangelogFiles = %s}", this.partition, this.newFileChangelogFiles, this.compactChangelogFiles);
    }

    private static class Result {
        private final int bucket;
        private final boolean isCompactResult;
        private final DataFileMeta meta;
        private final long offset;
        private final long length;

        private Result(int bucket, boolean isCompactResult, DataFileMeta meta, long offset, long length) {
            this.bucket = bucket;
            this.isCompactResult = isCompactResult;
            this.meta = meta;
            this.offset = offset;
            this.length = length;
        }
    }

    private static class OutputStream {
        private Path path;
        private PositionOutputStream out;
        private boolean isInitialized = false;

        private OutputStream() {
        }

        private void init(Path path, PositionOutputStream out) {
            this.path = path;
            this.out = out;
            this.isInitialized = true;
        }
    }

    private static class ReadTask {
        private final FileStoreTable table;
        private final Path path;
        private final int bucket;
        private final boolean isCompactResult;
        private final DataFileMeta meta;
        private byte[] result = null;
        private Exception exception = null;

        private ReadTask(FileStoreTable table, Path path, int bucket, boolean isCompactResult, DataFileMeta meta) {
            this.table = table;
            this.path = path;
            this.bucket = bucket;
            this.isCompactResult = isCompactResult;
            this.meta = meta;
        }

        private void readFully() {
            try {
                this.result = IOUtils.readFully((InputStream)this.table.fileIO().newInputStream(this.path), true);
                this.table.fileIO().deleteQuietly(this.path);
            }
            catch (Exception e) {
                this.exception = e;
            }
        }
    }
}

