/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
import org.apache.paimon.flink.sink.MultiTablesStoreCompactOperator;
import org.apache.paimon.flink.sink.StoreCompactOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class CompactorSinkITCase
extends AbstractTestBase {
    private static final RowType ROW_TYPE = RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()}, (String[])new String[]{"k", "v", "hh", "dt"});
    private Path tablePath;
    private String commitUser;

    @BeforeEach
    public void before() throws IOException {
        this.tablePath = new Path(this.getTempDirPath());
        this.commitUser = UUID.randomUUID().toString();
    }

    @Test
    public void testCompact() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        SnapshotManager snapshotManager = table.snapshotManager();
        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
        StreamTableWrite write = streamWriteBuilder.newWrite();
        StreamTableCommit commit = streamWriteBuilder.newCommit();
        write.write((InternalRow)this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")));
        write.write((InternalRow)this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")));
        write.write((InternalRow)this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        commit.commit(0L, write.prepareCommit(true, 0L));
        write.write((InternalRow)this.rowData(2, 200, 15, BinaryString.fromString((String)"20221208")));
        write.write((InternalRow)this.rowData(2, 200, 16, BinaryString.fromString((String)"20221208")));
        write.write((InternalRow)this.rowData(2, 200, 15, BinaryString.fromString((String)"20221209")));
        commit.commit(1L, write.prepareCommit(true, 1L));
        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assertions.assertThat((long)snapshot.id()).isEqualTo(2L);
        Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.APPEND);
        write.close();
        commit.close();
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().batchMode().build();
        CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(this.tablePath.toString(), table);
        Predicate predicate = PartitionPredicate.createPartitionPredicate(this.getSpecifiedPartitions(), (RowType)table.rowType(), (String)table.coreOptions().partitionDefaultName());
        DataStreamSource source = sourceBuilder.withEnv(env).withContinuousMode(false).withPartitionPredicate(predicate).build();
        new CompactorSinkBuilder(table, true).withInput((DataStream)source).build();
        env.execute();
        snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assertions.assertThat((long)snapshot.id()).isEqualTo(3L);
        Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.COMPACT);
        TableScan.Plan plan = table.newReadBuilder().newScan().plan();
        Assertions.assertThat((int)plan.splits().size()).isEqualTo(3);
        for (Split split : plan.splits()) {
            DataSplit dataSplit = (DataSplit)split;
            if (dataSplit.partition().getInt(1) == 15) {
                Assertions.assertThat((int)dataSplit.dataFiles().size()).isEqualTo(1);
                continue;
            }
            Assertions.assertThat((int)dataSplit.dataFiles().size()).isEqualTo(2);
        }
    }

    @Test
    public void testCompactParallelism() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        StreamExecutionEnvironment env = this.streamExecutionEnvironmentBuilder().streamingMode().build();
        CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(this.tablePath.toString(), table);
        Predicate predicate = PartitionPredicate.createPartitionPredicate(this.getSpecifiedPartitions(), (RowType)table.rowType(), (String)table.coreOptions().partitionDefaultName());
        DataStreamSource source = sourceBuilder.withEnv(env).withContinuousMode(false).withPartitionPredicate(predicate).build();
        final Integer sinkParalellism = new Random().nextInt(100) + 1;
        new CompactorSinkBuilder(table.copy((Map)new HashMap<String, String>(){
            {
                this.put(FlinkConnectorOptions.SINK_PARALLELISM.key(), String.valueOf(sinkParalellism));
            }
        }), false).withInput((DataStream)source).build();
        Assertions.assertThat((int)((Transformation)env.getTransformations().get(0)).getParallelism()).isEqualTo((Object)sinkParalellism);
    }

    private List<Map<String, String>> getSpecifiedPartitions() {
        HashMap<String, String> partition1 = new HashMap<String, String>();
        partition1.put("dt", "20221208");
        partition1.put("hh", "15");
        HashMap<String, String> partition2 = new HashMap<String, String>();
        partition2.put("dt", "20221209");
        partition2.put("hh", "15");
        return Arrays.asList(partition1, partition2);
    }

    private GenericRow rowData(Object ... values) {
        return GenericRow.of((Object[])values);
    }

    private FileStoreTable createFileStoreTable() throws Exception {
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), this.tablePath);
        TableSchema tableSchema = schemaManager.createTable(new Schema(ROW_TYPE.getFields(), Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.singletonMap("bucket", "1"), ""));
        return FileStoreTableFactory.create((FileIO)LocalFileIO.create(), (Path)this.tablePath, (TableSchema)tableSchema);
    }

    private FileStoreTable createCatalogTable(Catalog catalog, Identifier tableIdentifier) throws Exception {
        Schema tableSchema = new Schema(ROW_TYPE.getFields(), Collections.emptyList(), Collections.singletonList("k"), Collections.singletonMap("bucket", "1"), "");
        catalog.createTable(tableIdentifier, tableSchema, false);
        return (FileStoreTable)catalog.getTable(tableIdentifier);
    }

    private OneInputStreamOperatorTestHarness<RowData, Committable> createTestHarness(OneInputStreamOperator<RowData, Committable> operator) throws Exception {
        TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness(operator);
        harness.setup(serializer);
        return harness;
    }

    private OneInputStreamOperatorTestHarness<RowData, MultiTableCommittable> createMultiTablesTestHarness(OneInputStreamOperator<RowData, MultiTableCommittable> operator) throws Exception {
        TypeSerializer serializer = new MultiTableCommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness(operator);
        harness.setup(serializer);
        return harness;
    }

    protected StoreCompactOperator.Factory createCompactOperator(FileStoreTable table) {
        return new StoreCompactOperator.Factory(table, (StoreSinkWrite.Provider & Serializable)(t, commitUser, state, ioManager, memoryPool, metricGroup) -> new StoreSinkWriteImpl(t, commitUser, state, ioManager, false, false, false, memoryPool, metricGroup), "test", true);
    }

    protected MultiTablesStoreCompactOperator.Factory createMultiTablesCompactOperator(CatalogLoader catalogLoader) throws Exception {
        return new MultiTablesStoreCompactOperator.Factory(catalogLoader, this.commitUser, new CheckpointConfig(), false, false, true, new Options());
    }

    private static byte[] partition(String dt, int hh) {
        BinaryRow row = new BinaryRow(2);
        BinaryRowWriter writer = new BinaryRowWriter(row);
        writer.writeString(0, BinaryString.fromString((String)dt));
        writer.writeInt(1, hh);
        writer.complete();
        return SerializationUtils.serializeBinaryRow((BinaryRow)row);
    }

    private void prepareDataFile(FileStoreTable table) throws Exception {
        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
        StreamTableWrite write = streamWriteBuilder.newWrite();
        StreamTableCommit commit = streamWriteBuilder.newCommit();
        write.write((InternalRow)this.rowData(1, 100, 15, BinaryString.fromString((String)"20221208")));
        write.write((InternalRow)this.rowData(1, 100, 16, BinaryString.fromString((String)"20221208")));
        write.write((InternalRow)this.rowData(1, 100, 15, BinaryString.fromString((String)"20221209")));
        commit.commit(0L, write.prepareCommit(true, 0L));
        write.write((InternalRow)this.rowData(2, 200, 15, BinaryString.fromString((String)"20221208")));
        write.write((InternalRow)this.rowData(2, 200, 16, BinaryString.fromString((String)"20221208")));
        write.write((InternalRow)this.rowData(2, 200, 15, BinaryString.fromString((String)"20221209")));
        commit.commit(1L, write.prepareCommit(true, 1L));
        write.close();
        commit.close();
    }
}

