/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class CompactorSinkITCase
extends AbstractTestBase {
    private static final RowType ROW_TYPE = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()}, (String[])new String[]{"k", "v", "hh", "dt"});
    private Path tablePath;
    private String commitUser;

    @BeforeEach
    public void before() throws IOException {
        this.tablePath = new Path(this.getTempDirPath());
        this.commitUser = UUID.randomUUID().toString();
    }

    @Test
    public void testCompact() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        SnapshotManager snapshotManager = table.snapshotManager();
        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
        StreamTableWrite write = streamWriteBuilder.newWrite();
        StreamTableCommit commit = streamWriteBuilder.newCommit();
        write.write((InternalRow)this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")));
        write.write((InternalRow)this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")));
        write.write((InternalRow)this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        commit.commit(0L, write.prepareCommit(true, 0L));
        write.write((InternalRow)this.rowData(2, 200, 15, BinaryString.fromString((String)"20221208")));
        write.write((InternalRow)this.rowData(2, 200, 16, BinaryString.fromString((String)"20221208")));
        write.write((InternalRow)this.rowData(2, 200, 15, BinaryString.fromString((String)"20221209")));
        commit.commit(1L, write.prepareCommit(true, 1L));
        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assertions.assertThat((long)snapshot.id()).isEqualTo(2L);
        Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.APPEND);
        write.close();
        commit.close();
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().batchMode().build();
        CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(this.tablePath.toString(), table);
        DataStreamSource source = sourceBuilder.withEnv(env).withContinuousMode(false).withPartitionPredicate(PartitionPredicate.fromMaps((RowType)table.schema().logicalPartitionType(), this.getSpecifiedPartitions(), (String)table.coreOptions().partitionDefaultName())).build();
        new CompactorSinkBuilder(table, true).withInput((DataStream)source).build();
        env.execute();
        snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assertions.assertThat((long)snapshot.id()).isEqualTo(3L);
        Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.COMPACT);
        TableScan.Plan plan = table.newReadBuilder().newScan().plan();
        Assertions.assertThat((int)plan.splits().size()).isEqualTo(3);
        for (Split split : plan.splits()) {
            DataSplit dataSplit = (DataSplit)split;
            if (dataSplit.partition().getInt(1) == 15) {
                Assertions.assertThat((int)dataSplit.dataFiles().size()).isEqualTo(1);
                continue;
            }
            Assertions.assertThat((int)dataSplit.dataFiles().size()).isEqualTo(2);
        }
    }

    @Test
    public void testCompactParallelism() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().build();
        CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(this.tablePath.toString(), table);
        DataStreamSource source = sourceBuilder.withEnv(env).withContinuousMode(false).withPartitionPredicate(PartitionPredicate.fromMaps((RowType)table.schema().logicalPartitionType(), this.getSpecifiedPartitions(), (String)table.coreOptions().partitionDefaultName())).build();
        final Integer sinkParalellism = new Random().nextInt(100) + 1;
        new CompactorSinkBuilder(table.copy((Map)new HashMap<String, String>(){
            {
                this.put(FlinkConnectorOptions.SINK_PARALLELISM.key(), String.valueOf(sinkParalellism));
            }
        }), false).withInput((DataStream)source).build();
        Assertions.assertThat((int)((Transformation)env.getTransformations().get(0)).getParallelism()).isEqualTo((Object)sinkParalellism);
    }

    private List<Map<String, String>> getSpecifiedPartitions() {
        HashMap<String, String> partition1 = new HashMap<String, String>();
        partition1.put("dt", "20221208");
        partition1.put("hh", "15");
        HashMap<String, String> partition2 = new HashMap<String, String>();
        partition2.put("dt", "20221209");
        partition2.put("hh", "15");
        return Arrays.asList(partition1, partition2);
    }

    private GenericRow rowData(Object ... values) {
        return GenericRow.of((Object[])values);
    }

    private FileStoreTable createFileStoreTable() throws Exception {
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), this.tablePath);
        TableSchema tableSchema = schemaManager.createTable(new Schema(ROW_TYPE.getFields(), Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.singletonMap("bucket", "1"), ""));
        return FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Path)this.tablePath, (TableSchema)tableSchema);
    }
}

