/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SerializationUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class CompactorSourceITCase
extends AbstractTestBase {
    private static final RowType ROW_TYPE = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()}, (String[])new String[]{"k", "v", "dt", "hh"});
    private final DataFileMetaSerializer dataFileMetaSerializer = new DataFileMetaSerializer();
    private Path tablePath;
    private String commitUser;

    @BeforeEach
    public void before() throws IOException {
        this.tablePath = new Path(this.getTempDirPath());
        this.commitUser = UUID.randomUUID().toString();
    }

    @ParameterizedTest(name="defaultOptions = {0}")
    @ValueSource(booleans={true, false})
    public void testBatchRead(boolean defaultOptions) throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        if (!defaultOptions) {
            table = table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2"));
        }
        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
        StreamTableWrite write = streamWriteBuilder.newWrite();
        StreamTableCommit commit = streamWriteBuilder.newCommit();
        write.write((InternalRow)this.rowData(1, 1510, BinaryString.fromString((String)"20221208"), 15));
        write.write((InternalRow)this.rowData(2, 1620, BinaryString.fromString((String)"20221208"), 16));
        commit.commit(0L, write.prepareCommit(true, 0L));
        write.write((InternalRow)this.rowData(1, 1511, BinaryString.fromString((String)"20221208"), 15));
        write.write((InternalRow)this.rowData(1, 1510, BinaryString.fromString((String)"20221209"), 15));
        commit.commit(1L, write.prepareCommit(true, 1L));
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().build();
        DataStreamSource compactorSource = new CompactorSourceBuilder("test", table).withContinuousMode(false).withEnv(env).build();
        CloseableIterator it = compactorSource.executeAndCollect();
        ArrayList<String> actual = new ArrayList<String>();
        while (it.hasNext()) {
            actual.add(this.toString((RowData)it.next()));
        }
        Assertions.assertThat(actual).hasSameElementsAs(Arrays.asList("+I 2|20221208|15|0|0", "+I 2|20221208|16|0|0", "+I 2|20221209|15|0|0"));
        write.close();
        commit.close();
        it.close();
    }

    @ParameterizedTest(name="defaultOptions = {0}")
    @ValueSource(booleans={true, false})
    public void testStreamingRead(boolean defaultOptions) throws Exception {
        int i;
        FileStoreTable table = this.createFileStoreTable();
        if (!defaultOptions) {
            HashMap<String, String> dynamicOptions = new HashMap<String, String>();
            dynamicOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
            dynamicOptions.put(CoreOptions.CHANGELOG_PRODUCER.key(), CoreOptions.ChangelogProducer.NONE.toString());
            dynamicOptions.put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "0");
            table = table.copy(dynamicOptions);
        }
        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
        StreamTableWrite write = streamWriteBuilder.newWrite();
        StreamTableCommit commit = streamWriteBuilder.newCommit();
        write.write((InternalRow)this.rowData(1, 1510, BinaryString.fromString((String)"20221208"), 15));
        write.write((InternalRow)this.rowData(2, 1620, BinaryString.fromString((String)"20221208"), 16));
        commit.commit(0L, write.prepareCommit(true, 0L));
        write.write((InternalRow)this.rowData(1, 1511, BinaryString.fromString((String)"20221208"), 15));
        write.write((InternalRow)this.rowData(1, 1510, BinaryString.fromString((String)"20221209"), 15));
        write.compact(this.binaryRow("20221208", 15), 0, true);
        write.compact(this.binaryRow("20221209", 15), 0, true);
        commit.commit(1L, write.prepareCommit(true, 1L));
        write.write((InternalRow)this.rowData(2, 1520, BinaryString.fromString((String)"20221208"), 15));
        write.write((InternalRow)this.rowData(2, 1621, BinaryString.fromString((String)"20221208"), 16));
        commit.commit(2L, write.prepareCommit(true, 2L));
        write.write((InternalRow)this.rowData(1, 1512, BinaryString.fromString((String)"20221208"), 15));
        write.write((InternalRow)this.rowData(2, 1620, BinaryString.fromString((String)"20221209"), 16));
        commit.commit(3L, write.prepareCommit(true, 3L));
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().build();
        DataStreamSource compactorSource = new CompactorSourceBuilder("test", table).withContinuousMode(true).withEnv(env).build();
        CloseableIterator it = compactorSource.executeAndCollect();
        ArrayList<String> actual = new ArrayList<String>();
        for (i = 0; i < 4; ++i) {
            actual.add(this.toString((RowData)it.next()));
        }
        Assertions.assertThat(actual).hasSameElementsAs(Arrays.asList("+I 4|20221208|15|0|1", "+I 4|20221208|16|0|1", "+I 5|20221208|15|0|1", "+I 5|20221209|16|0|1"));
        write.write((InternalRow)this.rowData(2, 1520, BinaryString.fromString((String)"20221209"), 15));
        write.write((InternalRow)this.rowData(1, 1510, BinaryString.fromString((String)"20221208"), 16));
        write.write((InternalRow)this.rowData(1, 1511, BinaryString.fromString((String)"20221209"), 15));
        commit.commit(4L, write.prepareCommit(true, 4L));
        actual.clear();
        for (i = 0; i < 2; ++i) {
            actual.add(this.toString((RowData)it.next()));
        }
        Assertions.assertThat(actual).hasSameElementsAs(Arrays.asList("+I 6|20221208|16|0|1", "+I 6|20221209|15|0|1"));
        write.close();
        commit.close();
        it.close();
    }

    @Test
    public void testStreamingPartitionSpec() throws Exception {
        this.testPartitionSpec(true, this.getSpecifiedPartitions(), Arrays.asList("+I 1|20221208|16|0|1", "+I 2|20221209|15|0|1", "+I 3|20221208|16|0|1", "+I 3|20221209|15|0|1"));
    }

    @Test
    public void testBatchPartitionSpec() throws Exception {
        this.testPartitionSpec(false, this.getSpecifiedPartitions(), Arrays.asList("+I 3|20221208|16|0|0", "+I 3|20221209|15|0|0"));
    }

    private List<Map<String, String>> getSpecifiedPartitions() {
        HashMap<String, String> partition1 = new HashMap<String, String>();
        partition1.put("dt", "20221208");
        partition1.put("hh", "16");
        HashMap<String, String> partition2 = new HashMap<String, String>();
        partition2.put("dt", "20221209");
        partition2.put("hh", "15");
        return Arrays.asList(partition1, partition2);
    }

    private void testPartitionSpec(boolean isStreaming, List<Map<String, String>> specifiedPartitions, List<String> expected) throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
        StreamTableWrite write = streamWriteBuilder.newWrite();
        StreamTableCommit commit = streamWriteBuilder.newCommit();
        write.write((InternalRow)this.rowData(1, 1510, BinaryString.fromString((String)"20221208"), 15));
        write.write((InternalRow)this.rowData(2, 1620, BinaryString.fromString((String)"20221208"), 16));
        commit.commit(0L, write.prepareCommit(true, 0L));
        write.write((InternalRow)this.rowData(2, 1520, BinaryString.fromString((String)"20221208"), 15));
        write.write((InternalRow)this.rowData(2, 1520, BinaryString.fromString((String)"20221209"), 15));
        commit.commit(1L, write.prepareCommit(true, 1L));
        write.write((InternalRow)this.rowData(1, 1511, BinaryString.fromString((String)"20221208"), 15));
        write.write((InternalRow)this.rowData(1, 1610, BinaryString.fromString((String)"20221208"), 16));
        write.write((InternalRow)this.rowData(1, 1510, BinaryString.fromString((String)"20221209"), 15));
        commit.commit(2L, write.prepareCommit(true, 2L));
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().build();
        DataStreamSource compactorSource = new CompactorSourceBuilder("test", table).withContinuousMode(isStreaming).withEnv(env).withPartitionPredicate(PartitionPredicate.fromMaps((RowType)table.schema().logicalPartitionType(), specifiedPartitions, (String)table.coreOptions().partitionDefaultName())).build();
        CloseableIterator it = compactorSource.executeAndCollect();
        ArrayList<String> actual = new ArrayList<String>();
        for (int i = 0; i < expected.size(); ++i) {
            actual.add(this.toString((RowData)it.next()));
        }
        Assertions.assertThat(actual).hasSameElementsAs(expected);
        write.close();
        commit.close();
        it.close();
    }

    @ParameterizedTest(name="defaultOptions = {0}")
    @ValueSource(booleans={true, false})
    public void testHistoryPartitionRead(boolean defaultOptions) throws Exception {
        Duration partitionIdleTime = Duration.ofMillis(3000L);
        FileStoreTable table = this.createFileStoreTable();
        if (!defaultOptions) {
            table = table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2"));
        }
        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
        StreamTableWrite write = streamWriteBuilder.newWrite();
        StreamTableCommit commit = streamWriteBuilder.newCommit();
        write.write((InternalRow)this.rowData(1, 1510, BinaryString.fromString((String)"20221208"), 15));
        write.write((InternalRow)this.rowData(1, 1620, BinaryString.fromString((String)"20221208"), 16));
        commit.commit(0L, write.prepareCommit(true, 0L));
        write.write((InternalRow)this.rowData(2, 1511, BinaryString.fromString((String)"20221208"), 15));
        write.write((InternalRow)this.rowData(2, 1510, BinaryString.fromString((String)"20221209"), 15));
        commit.commit(1L, write.prepareCommit(true, 1L));
        Thread.sleep(3000L);
        write.write((InternalRow)this.rowData(3, 1510, BinaryString.fromString((String)"20221208"), 16));
        commit.commit(2L, write.prepareCommit(true, 2L));
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().build();
        DataStreamSource compactorSource = new CompactorSourceBuilder("test", table).withContinuousMode(false).withPartitionIdleTime(partitionIdleTime).withEnv(env).build();
        CloseableIterator it = compactorSource.executeAndCollect();
        ArrayList<String> actual = new ArrayList<String>();
        while (it.hasNext()) {
            actual.add(this.toString((RowData)it.next()));
        }
        Assertions.assertThat(actual).hasSameElementsAs(Arrays.asList("+I 3|20221208|15|0|0", "+I 3|20221209|15|0|0"));
        write.close();
        commit.close();
        it.close();
    }

    private String toString(RowData rowData) {
        int numFiles;
        try {
            numFiles = this.dataFileMetaSerializer.deserializeList(rowData.getBinary(3)).size();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        BinaryRow partition = SerializationUtils.deserializeBinaryRow((byte[])rowData.getBinary(1));
        return String.format("%s %d|%s|%d|%d|%d", rowData.getRowKind().shortString(), rowData.getLong(0), partition.getString(0), partition.getInt(1), rowData.getInt(2), numFiles);
    }

    private GenericRow rowData(Object ... values) {
        return GenericRow.of((Object[])values);
    }

    private BinaryRow binaryRow(String dt, int hh) {
        BinaryRow b = new BinaryRow(2);
        BinaryRowWriter writer = new BinaryRowWriter(b);
        writer.writeString(0, BinaryString.fromString((String)dt));
        writer.writeInt(1, hh);
        writer.complete();
        return b;
    }

    private FileStoreTable createFileStoreTable() throws Exception {
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), this.tablePath);
        TableSchema tableSchema = schemaManager.createTable(new Schema(ROW_TYPE.getFields(), Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.singletonMap("bucket", "1"), ""));
        return FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Path)this.tablePath, (TableSchema)tableSchema);
    }
}

