/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.compact.changelog;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Preconditions;

public class ChangelogCompactTask
implements Serializable {
    private final long checkpointId;
    private final BinaryRow partition;
    private final Map<Integer, List<DataFileMeta>> newFileChangelogFiles;
    private final Map<Integer, List<DataFileMeta>> compactChangelogFiles;

    public ChangelogCompactTask(long checkpointId, BinaryRow partition, Map<Integer, List<DataFileMeta>> newFileChangelogFiles, Map<Integer, List<DataFileMeta>> compactChangelogFiles) {
        this.checkpointId = checkpointId;
        this.partition = partition;
        this.newFileChangelogFiles = newFileChangelogFiles;
        this.compactChangelogFiles = compactChangelogFiles;
    }

    public long checkpointId() {
        return this.checkpointId;
    }

    public BinaryRow partition() {
        return this.partition;
    }

    public Map<Integer, List<DataFileMeta>> newFileChangelogFiles() {
        return this.newFileChangelogFiles;
    }

    public Map<Integer, List<DataFileMeta>> compactChangelogFiles() {
        return this.compactChangelogFiles;
    }

    public List<Committable> doCompact(FileStoreTable table) throws Exception {
        DataFilePathFactory dataFilePathFactory;
        FileStorePathFactory pathFactory = table.store().pathFactory();
        OutputStream outputStream = new OutputStream();
        ArrayList<Result> results = new ArrayList<Result>();
        for (Map.Entry<Integer, List<DataFileMeta>> entry : this.newFileChangelogFiles.entrySet()) {
            int bucket = entry.getKey();
            dataFilePathFactory = pathFactory.createDataFilePathFactory(this.partition, bucket);
            for (DataFileMeta meta : entry.getValue()) {
                this.copyFile(outputStream, results, table, dataFilePathFactory.toPath(meta), bucket, false, meta);
            }
        }
        for (Map.Entry<Integer, List<DataFileMeta>> entry : this.compactChangelogFiles.entrySet()) {
            Integer bucket = entry.getKey();
            dataFilePathFactory = pathFactory.createDataFilePathFactory(this.partition, bucket);
            for (DataFileMeta meta : entry.getValue()) {
                this.copyFile(outputStream, results, table, dataFilePathFactory.toPath(meta), bucket, true, meta);
            }
        }
        outputStream.out.close();
        return this.produceNewCommittables(results, table, pathFactory, outputStream.path);
    }

    private void copyFile(OutputStream outputStream, List<Result> results, FileStoreTable table, Path path, int bucket, boolean isCompactResult, DataFileMeta meta) throws Exception {
        if (!outputStream.isInitialized) {
            Path outputPath = new Path(path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID());
            outputStream.init(outputPath, table.fileIO().newOutputStream(outputPath, false));
        }
        long offset = outputStream.out.getPos();
        try (SeekableInputStream in = table.fileIO().newInputStream(path);){
            IOUtils.copyBytes(in, outputStream.out, 4096, false);
        }
        table.fileIO().deleteQuietly(path);
        results.add(new Result(bucket, isCompactResult, meta, offset, outputStream.out.getPos() - offset));
    }

    private List<Committable> produceNewCommittables(List<Result> results, FileStoreTable table, FileStorePathFactory pathFactory, Path changelogTempPath) throws IOException {
        Result baseResult = results.get(0);
        Preconditions.checkArgument(baseResult.offset == 0L);
        DataFilePathFactory dataFilePathFactory = pathFactory.createDataFilePathFactory(this.partition, baseResult.bucket);
        String realName = "compacted-changelog-" + UUID.randomUUID() + "$" + baseResult.bucket + "-" + baseResult.length;
        table.fileIO().rename(changelogTempPath, dataFilePathFactory.toAlignedPath(realName + "." + CompactedChangelogReadOnlyFormat.getIdentifier(baseResult.meta.fileFormat()), baseResult.meta));
        ArrayList<Committable> newCommittables = new ArrayList<Committable>();
        HashMap<Integer, List> bucketedResults = new HashMap<Integer, List>();
        for (Result result : results) {
            bucketedResults.computeIfAbsent(result.bucket, b -> new ArrayList()).add(result);
        }
        for (Map.Entry entry : bucketedResults.entrySet()) {
            ArrayList<DataFileMeta> newFilesChangelog = new ArrayList<DataFileMeta>();
            ArrayList<DataFileMeta> compactChangelog = new ArrayList<DataFileMeta>();
            for (Result result : (List)entry.getValue()) {
                String name = (result.offset == 0L ? realName : realName + "-" + result.offset + "-" + result.length) + "." + CompactedChangelogReadOnlyFormat.getIdentifier(result.meta.fileFormat());
                if (result.isCompactResult) {
                    compactChangelog.add(result.meta.rename(name));
                    continue;
                }
                newFilesChangelog.add(result.meta.rename(name));
            }
            CommitMessageImpl newMessage = new CommitMessageImpl(this.partition, (Integer)entry.getKey(), new DataIncrement(Collections.emptyList(), Collections.emptyList(), newFilesChangelog), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), compactChangelog));
            newCommittables.add(new Committable(this.checkpointId, Committable.Kind.FILE, newMessage));
        }
        return newCommittables;
    }

    public int hashCode() {
        return Objects.hash(this.checkpointId, this.partition, this.newFileChangelogFiles, this.compactChangelogFiles);
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        ChangelogCompactTask that = (ChangelogCompactTask)o;
        return this.checkpointId == that.checkpointId && Objects.equals(this.partition, that.partition) && Objects.equals(this.newFileChangelogFiles, that.newFileChangelogFiles) && Objects.equals(this.compactChangelogFiles, that.compactChangelogFiles);
    }

    public String toString() {
        return String.format("ChangelogCompactionTask {partition = %s, newFileChangelogFiles = %s, compactChangelogFiles = %s}", this.partition, this.newFileChangelogFiles, this.compactChangelogFiles);
    }

    private static class Result {
        private final int bucket;
        private final boolean isCompactResult;
        private final DataFileMeta meta;
        private final long offset;
        private final long length;

        private Result(int bucket, boolean isCompactResult, DataFileMeta meta, long offset, long length) {
            this.bucket = bucket;
            this.isCompactResult = isCompactResult;
            this.meta = meta;
            this.offset = offset;
            this.length = length;
        }
    }

    private static class OutputStream {
        private Path path;
        private PositionOutputStream out;
        private boolean isInitialized = false;

        private OutputStream() {
        }

        private void init(Path path, PositionOutputStream out) {
            this.path = path;
            this.out = out;
            this.isInitialized = true;
        }
    }
}

