/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.collection.Tuple3;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamerSchemaEvolutionBase;
import org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamerSchemaEvolutionExtensive;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

class TestHoodieDeltaStreamerErrorTableWriteFlow
extends TestHoodieDeltaStreamerSchemaEvolutionBase {
    TestHoodieDeltaStreamerErrorTableWriteFlow() {
    }

    protected void testBase(Tuple3<Integer, Integer, Integer> sourceGenInfo) throws Exception {
        int totalRecords = (Integer)sourceGenInfo.f0;
        int errorRecords = (Integer)sourceGenInfo.f1;
        int numFiles = (Integer)sourceGenInfo.f2;
        boolean shouldCreateMultipleSourceFiles = numFiles > 1;
        PARQUET_SOURCE_ROOT = basePath + "parquetFilesDfs" + testNum++;
        if (totalRecords > 0) {
            if (shouldCreateMultipleSourceFiles) {
                TestHoodieDeltaStreamerErrorTableWriteFlow.prepareParquetDFSMultiFiles(totalRecords - errorRecords, PARQUET_SOURCE_ROOT, numFiles);
            } else {
                TestHoodieDeltaStreamerErrorTableWriteFlow.prepareParquetDFSFiles(totalRecords - errorRecords, PARQUET_SOURCE_ROOT);
            }
            if (errorRecords > 0) {
                String errorDataSourceRoot = basePath + "parquetErrorFilesDfs" + testNum++;
                TestHoodieDeltaStreamerErrorTableWriteFlow.prepareParquetDFSFiles(errorRecords, errorDataSourceRoot);
                Dataset df = sparkSession.read().parquet(errorDataSourceRoot);
                df = df.withColumn("_row_key", functions.lit((Object)""));
                this.addParquetData((Dataset<Row>)df, false);
            }
        } else {
            fs.mkdirs(new Path(PARQUET_SOURCE_ROOT));
        }
        this.tableName = "test_parquet_table" + testNum;
        this.tableBasePath = basePath + this.tableName;
        this.deltaStreamer = new HoodieDeltaStreamer(this.getDeltaStreamerConfig(), jsc);
        this.deltaStreamer.sync();
        Dataset baseDf = sparkSession.read().format("hudi").load(this.tableBasePath);
        Assertions.assertEquals((long)(totalRecords - errorRecords), (long)baseDf.count());
        HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy((Configuration)jsc.hadoopConfiguration())).setBasePath(this.tableBasePath).build();
        Assertions.assertEquals((int)1, (int)metaClient.getActiveTimeline().getInstants().size());
        if (this.withErrorTable) {
            TestHoodieDeltaStreamerSchemaEvolutionExtensive.validateErrorTable(errorRecords, this.writeErrorTableInParallelWithBaseTable);
        }
    }

    protected static Stream<Arguments> testErrorTableWriteFlowArgs() {
        Stream.Builder<Arguments> b = Stream.builder();
        b.add(Arguments.of((Object[])new Object[]{0, 0, 0, WriteOperationType.INSERT, true}));
        b.add(Arguments.of((Object[])new Object[]{0, 0, 0, WriteOperationType.INSERT, false}));
        b.add(Arguments.of((Object[])new Object[]{100, 5, 1, WriteOperationType.INSERT, true}));
        b.add(Arguments.of((Object[])new Object[]{100, 5, 1, WriteOperationType.INSERT, false}));
        b.add(Arguments.of((Object[])new Object[]{100, 5, 1, WriteOperationType.UPSERT, true}));
        b.add(Arguments.of((Object[])new Object[]{100, 5, 1, WriteOperationType.UPSERT, false}));
        b.add(Arguments.of((Object[])new Object[]{100, 5, 1, WriteOperationType.BULK_INSERT, true}));
        b.add(Arguments.of((Object[])new Object[]{100, 5, 1, WriteOperationType.BULK_INSERT, false}));
        return b.build();
    }

    @ParameterizedTest
    @MethodSource(value={"testErrorTableWriteFlowArgs"})
    void testErrorTableWriteFlow(int totalRecords, int numErrorRecords, int numSourceFiles, WriteOperationType wopType, boolean writeErrorTableInParallel) throws Exception {
        this.withErrorTable = true;
        this.writeErrorTableInParallelWithBaseTable = writeErrorTableInParallel;
        this.writeOperationType = wopType;
        this.useSchemaProvider = false;
        this.useTransformer = false;
        this.tableType = "COPY_ON_WRITE";
        this.shouldCluster = false;
        this.shouldCompact = false;
        this.rowWriterEnable = false;
        this.addFilegroups = false;
        this.multiLogFiles = false;
        this.dfsSourceLimitBytes = 100000000;
        this.testBase((Tuple3<Integer, Integer, Integer>)Tuple3.of((Object)totalRecords, (Object)numErrorRecords, (Object)numSourceFiles));
    }
}

