/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.compact.changelog;

import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.compact.changelog.ChangelogCompactTask;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class ChangelogCompactTaskTest {
    @TempDir
    java.nio.file.Path tempDir;

    @Test
    public void testExceptionWhenRead() throws Exception {
        FileSystemCatalog catalog = new FileSystemCatalog((FileIO)LocalFileIO.create(), new Path(this.tempDir.toString()));
        catalog.createDatabase("default", false);
        catalog.createTable(Identifier.create((String)"default", (String)"T"), new Schema(Arrays.asList(new DataField(0, "k", (DataType)DataTypes.INT()), new DataField(1, "v", (DataType)DataTypes.INT())), Collections.emptyList(), Collections.singletonList("k"), new HashMap(), ""), false);
        HashMap<Integer, List<DataFileMeta>> files = new HashMap<Integer, List<DataFileMeta>>();
        files.put(0, Collections.singletonList(DataFileMeta.forAppend((String)"unexisting-file", (long)128L, (long)0L, (SimpleStats)SimpleStats.EMPTY_STATS, (long)0L, (long)0L, (long)1L, Collections.emptyList(), null, null, null, null, null, null)));
        ChangelogCompactTask task = new ChangelogCompactTask(1L, BinaryRow.EMPTY_ROW, 1, files, new HashMap());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> task.doCompact((FileStoreTable)catalog.getTable(Identifier.create((String)"default", (String)"T")), Executors.newFixedThreadPool(1), MemorySize.ofMebiBytes((long)64L))).isInstanceOf(FileNotFoundException.class)).hasMessageContaining("unexisting-file");
    }

    @Test
    public void testManyExternalPaths() throws Exception {
        String warehouse = this.tempDir.toString() + "/warehouse";
        FileSystemCatalog catalog = new FileSystemCatalog((FileIO)LocalFileIO.create(), new Path(warehouse));
        catalog.createDatabase("default", false);
        ArrayList<String> externalPaths = new ArrayList<String>();
        for (int i = 0; i < 4; ++i) {
            String path = "file://" + this.tempDir.toString() + "/external" + i;
            externalPaths.add(path);
        }
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.BUCKET.key(), "2");
        options.put(CoreOptions.CHANGELOG_PRODUCER.key(), CoreOptions.ChangelogProducer.INPUT.toString());
        options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis()));
        options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), String.join((CharSequence)",", externalPaths));
        options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), CoreOptions.ExternalPathStrategy.ROUND_ROBIN.toString());
        catalog.createTable(Identifier.create((String)"default", (String)"T"), new Schema(Arrays.asList(new DataField(0, "k", (DataType)DataTypes.INT()), new DataField(1, "v", (DataType)DataTypes.INT())), Collections.emptyList(), Collections.singletonList("k"), options, ""), false);
        FileStoreTable table = (FileStoreTable)catalog.getTable(Identifier.create((String)"default", (String)"T"));
        int numRecords = 10;
        HashMap<Integer, List> files = new HashMap<Integer, List>();
        TableWriteImpl write = table.newWrite("test");
        for (int i = 0; i < numRecords; ++i) {
            write.write((InternalRow)GenericRow.of((Object[])new Object[]{i, i * 10}));
            for (CommitMessage message : write.prepareCommit(false, 1L)) {
                CommitMessageImpl casted = (CommitMessageImpl)message;
                files.computeIfAbsent(message.bucket(), k -> new ArrayList()).addAll(casted.newFilesIncrement().changelogFiles());
            }
        }
        write.close();
        ChangelogCompactTask task = new ChangelogCompactTask(1L, BinaryRow.EMPTY_ROW, 2, files, new HashMap());
        List messages = task.doCompact(table, Executors.newFixedThreadPool(1), MemorySize.ofMebiBytes((long)64L)).stream().map(c -> (CommitMessageImpl)c.wrappedCommittable()).collect(Collectors.toList());
        TableCommitImpl commit = table.newCommit("test");
        commit.commit(messages);
        commit.close();
        StreamDataTableScan scan = table.newStreamScan();
        ArrayList<String> actual = new ArrayList<String>();
        while (actual.size() < numRecords) {
            TableScan.Plan plan = scan.plan();
            RecordReaderIterator it = new RecordReaderIterator(table.newRead().createReader(plan));
            while (it.hasNext()) {
                InternalRow row = (InternalRow)it.next();
                actual.add(String.format("(%d, %d)", row.getInt(0), row.getInt(1)));
            }
        }
        ArrayList<String> expected = new ArrayList<String>();
        for (int i = 0; i < numRecords; ++i) {
            expected.add(String.format("(%d, %d)", i, i * 10));
        }
        Assertions.assertThat(actual).hasSameElementsAs(expected);
    }
}

