/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.sink.BucketPartitionKeySelector;
import org.apache.iceberg.flink.sink.BucketPartitioner;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.sink.TestBucketPartitionerUtil;
import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

public class TestBucketPartitionerFlinkIcebergSink {
    private static final int NUMBER_TASK_MANAGERS = 1;
    private static final int SLOTS_PER_TASK_MANAGER = 8;
    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(8).setConfiguration(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG).build());
    @RegisterExtension
    private static final HadoopCatalogExtension catalogExtension = new HadoopCatalogExtension("default", "t");
    private static final TypeInformation<Row> ROW_TYPE_INFO = new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
    private final int parallelism = 8;
    private final FileFormat format = FileFormat.PARQUET;
    private final int numBuckets = 4;
    private Table table;
    private StreamExecutionEnvironment env;
    private TableLoader tableLoader;

    private void setupEnvironment(TestBucketPartitionerUtil.TableSchemaType tableSchemaType) {
        PartitionSpec partitionSpec = tableSchemaType.getPartitionSpec(4);
        this.table = catalogExtension.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SimpleDataUtil.SCHEMA, partitionSpec, (Map)ImmutableMap.of((Object)"write.format.default", (Object)this.format.name()));
        this.env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG).enableCheckpointing(100L).setParallelism(8).setMaxParallelism(16);
        this.tableLoader = catalogExtension.tableLoader();
    }

    private void appendRowsToTable(List<RowData> allRows) throws Exception {
        DataFormatConverters.RowConverter converter = new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
        DataStream dataStream = this.env.addSource(new BoundedTestSource<Row>((Row[])allRows.stream().map(arg_0 -> ((DataFormatConverters.RowConverter)converter).toExternal(arg_0)).toArray(Row[]::new)), ROW_TYPE_INFO).map(arg_0 -> ((DataFormatConverters.RowConverter)converter).toInternal(arg_0), FlinkCompatibilityUtil.toTypeInfo((RowType)SimpleDataUtil.ROW_TYPE)).partitionCustom((Partitioner)new BucketPartitioner(this.table.spec()), (KeySelector)new BucketPartitionKeySelector(this.table.spec(), this.table.schema(), FlinkSink.toFlinkRowType((Schema)this.table.schema(), (TableSchema)SimpleDataUtil.FLINK_SCHEMA)));
        FlinkSink.forRowData((DataStream)dataStream).table(this.table).tableLoader(this.tableLoader).writeParallelism(8).distributionMode(DistributionMode.NONE).append();
        this.env.execute("Test Iceberg DataStream");
        SimpleDataUtil.assertTableRows(this.table, allRows);
    }

    @ParameterizedTest
    @EnumSource(value=TestBucketPartitionerUtil.TableSchemaType.class, names={"ONE_BUCKET", "IDENTITY_AND_BUCKET"})
    public void testSendRecordsToAllBucketsEvenly(TestBucketPartitionerUtil.TableSchemaType tableSchemaType) throws Exception {
        this.setupEnvironment(tableSchemaType);
        List<RowData> rows = this.generateTestDataRows();
        this.appendRowsToTable(rows);
        TableTestStats stats = this.extractPartitionResults(tableSchemaType);
        Assertions.assertThat((int)stats.totalRowCount).isEqualTo(rows.size());
        Assertions.assertThat((int)stats.writersPerBucket.size()).isEqualTo(4);
        Assertions.assertThat((int)stats.numFilesPerBucket.size()).isEqualTo(4);
        int i = 0;
        int j = 4;
        while (i < 4) {
            Assertions.assertThat(stats.writersPerBucket.get(i)).hasSameElementsAs(Arrays.asList(i, j));
            Assertions.assertThat((Integer)stats.numFilesPerBucket.get(i)).isEqualTo(2);
            Assertions.assertThat((Long)stats.rowsPerWriter.get(i)).isEqualTo(2L);
            ++i;
            ++j;
        }
    }

    private List<RowData> generateTestDataRows() {
        int totalNumRows = 16;
        int numRowsPerBucket = totalNumRows / 4;
        return TestBucketPartitionerUtil.generateRowsForBucketIdRange(numRowsPerBucket, 4);
    }

    private TableTestStats extractPartitionResults(TestBucketPartitionerUtil.TableSchemaType tableSchemaType) throws IOException {
        int totalRecordCount = 0;
        HashMap writersPerBucket = Maps.newHashMap();
        HashMap filesPerBucket = Maps.newHashMap();
        HashMap rowsPerWriter = Maps.newHashMap();
        try (CloseableIterable fileScanTasks = this.table.newScan().planFiles();){
            for (FileScanTask scanTask : fileScanTasks) {
                long recordCountInFile = ((DataFile)scanTask.file()).recordCount();
                String[] splitFilePath = ((DataFile)scanTask.file()).path().toString().split("/");
                String filename = splitFilePath[splitFilePath.length - 1];
                int writerId = Integer.parseInt(filename.split("-")[0]);
                totalRecordCount = (int)((long)totalRecordCount + recordCountInFile);
                int bucketId = (Integer)((DataFile)scanTask.file()).partition().get(tableSchemaType.bucketPartitionColumnPosition(), Integer.class);
                writersPerBucket.computeIfAbsent(bucketId, k -> Lists.newArrayList());
                ((List)writersPerBucket.get(bucketId)).add(writerId);
                filesPerBucket.put(bucketId, filesPerBucket.getOrDefault(bucketId, 0) + 1);
                rowsPerWriter.put(writerId, rowsPerWriter.getOrDefault(writerId, 0L) + recordCountInFile);
            }
        }
        return new TableTestStats(totalRecordCount, writersPerBucket, filesPerBucket, rowsPerWriter);
    }

    private static class TableTestStats {
        final int totalRowCount;
        final Map<Integer, List<Integer>> writersPerBucket;
        final Map<Integer, Integer> numFilesPerBucket;
        final Map<Integer, Long> rowsPerWriter;

        TableTestStats(int totalRecordCount, Map<Integer, List<Integer>> writersPerBucket, Map<Integer, Integer> numFilesPerBucket, Map<Integer, Long> rowsPerWriter) {
            this.totalRowCount = totalRecordCount;
            this.writersPerBucket = writersPerBucket;
            this.numFilesPerBucket = numFilesPerBucket;
            this.rowsPerWriter = rowsPerWriter;
        }
    }
}

