/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.flink.action.MarkPartitionDoneAction;
import org.apache.paimon.flink.sink.listener.MockCustomPartitionMarkDoneAction;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.partition.actions.HttpReportMarkDoneAction;
import org.apache.paimon.partition.file.SuccessFile;
import org.apache.paimon.rest.TestHttpWebServer;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class MarkPartitionDoneActionITCase
extends ActionITCaseBase {
    private static final DataType[] FIELD_TYPES = new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()};
    private static final RowType ROW_TYPE = RowType.of((DataType[])FIELD_TYPES, (String[])new String[]{"partKey0", "partKey1", "dt", "value"});

    private static Stream<Arguments> testArguments() {
        return Stream.of(Arguments.of((Object[])new Object[]{true, "action"}), Arguments.of((Object[])new Object[]{false, "action"}), Arguments.of((Object[])new Object[]{true, "procedure_indexed"}), Arguments.of((Object[])new Object[]{false, "procedure_indexed"}), Arguments.of((Object[])new Object[]{true, "procedure_named"}), Arguments.of((Object[])new Object[]{false, "procedure_named"}));
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testPartitionMarkDoneWithSinglePartitionKey(boolean hasPk, String invoker) throws Exception {
        FileStoreTable table = this.prepareTable(hasPk);
        switch (invoker) {
            case "action": {
                this.createAction(MarkPartitionDoneAction.class, "mark_partition_done", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName, "--partition", "partKey0=0").run();
                break;
            }
            case "procedure_indexed": {
                this.executeSQL(String.format("CALL sys.mark_partition_done('%s.%s', 'partKey0 = 0')", this.database, this.tableName));
                break;
            }
            case "procedure_named": {
                this.executeSQL(String.format("CALL sys.mark_partition_done(`table` => '%s.%s', partitions => 'partKey0 = 0')", this.database, this.tableName));
                break;
            }
            default: {
                throw new UnsupportedOperationException(invoker);
            }
        }
        Path successPath = new Path(table.location(), "partKey0=0/_SUCCESS");
        SuccessFile successFile = SuccessFile.safelyFromPath((FileIO)table.fileIO(), (Path)successPath);
        Assertions.assertThat((Object)successFile).isNotNull();
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testPartitionMarkDoneWithMultiplePartitionKey(boolean hasPk, String invoker) throws Exception {
        FileStoreTable table = this.prepareTable(hasPk);
        switch (invoker) {
            case "action": {
                this.createAction(MarkPartitionDoneAction.class, "mark_partition_done", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName, "--partition", "partKey0=0,partKey1=1", "--partition", "partKey0=1,partKey1=0").run();
                break;
            }
            case "procedure_indexed": {
                this.executeSQL(String.format("CALL sys.mark_partition_done('%s.%s', 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", this.database, this.tableName));
                break;
            }
            case "procedure_named": {
                this.executeSQL(String.format("CALL sys.mark_partition_done(`table` => '%s.%s', partitions => 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", this.database, this.tableName));
                break;
            }
            default: {
                throw new UnsupportedOperationException(invoker);
            }
        }
        Path successPath1 = new Path(table.location(), "partKey0=0/partKey1=1/_SUCCESS");
        SuccessFile successFile1 = SuccessFile.safelyFromPath((FileIO)table.fileIO(), (Path)successPath1);
        Assertions.assertThat((Object)successFile1).isNotNull();
        Path successPath2 = new Path(table.location(), "partKey0=1/partKey1=0/_SUCCESS");
        SuccessFile successFile2 = SuccessFile.safelyFromPath((FileIO)table.fileIO(), (Path)successPath2);
        Assertions.assertThat((Object)successFile2).isNotNull();
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testCustomPartitionMarkDoneAction(boolean hasPk, String invoker) throws Exception {
        MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().clear();
        HashMap<String, String> options = new HashMap<String, String>(2);
        options.put(CoreOptions.PARTITION_MARK_DONE_ACTION.key(), CoreOptions.PartitionMarkDoneAction.SUCCESS_FILE + "," + CoreOptions.PartitionMarkDoneAction.CUSTOM);
        options.put(CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS.key(), MockCustomPartitionMarkDoneAction.class.getName());
        FileStoreTable table = this.prepareTable(hasPk, options);
        String fullTableName = table.fullName();
        switch (invoker) {
            case "action": {
                this.createAction(MarkPartitionDoneAction.class, "mark_partition_done", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName, "--partition", "partKey0=0,partKey1=1", "--partition", "partKey0=1,partKey1=0").run();
                break;
            }
            case "procedure_indexed": {
                this.executeSQL(String.format("CALL sys.mark_partition_done('%s.%s', 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", this.database, this.tableName));
                break;
            }
            case "procedure_named": {
                this.executeSQL(String.format("CALL sys.mark_partition_done(`table` => '%s.%s', partitions => 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", this.database, this.tableName));
                break;
            }
            default: {
                throw new UnsupportedOperationException(invoker);
            }
        }
        Path successPath1 = new Path(table.location(), "partKey0=0/partKey1=1/_SUCCESS");
        SuccessFile successFile1 = SuccessFile.safelyFromPath((FileIO)table.fileIO(), (Path)successPath1);
        Assertions.assertThat((Object)successFile1).isNotNull();
        Path successPath2 = new Path(table.location(), "partKey0=1/partKey1=0/_SUCCESS");
        SuccessFile successFile2 = SuccessFile.safelyFromPath((FileIO)table.fileIO(), (Path)successPath2);
        Assertions.assertThat((Object)successFile2).isNotNull();
        Assertions.assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions()).containsExactlyInAnyOrder((Object[])new String[]{"table=" + fullTableName + ",partition=partKey0=0/partKey1=1/", "table=" + fullTableName + ",partition=partKey0=1/partKey1=0/"});
    }

    @ParameterizedTest
    @MethodSource(value={"testArguments"})
    public void testHttpReportPartitionMarkDoneAction(boolean hasPk, String invoker) throws Exception {
        TestHttpWebServer server = new TestHttpWebServer("");
        server.start();
        try {
            HashMap<String, String> options = new HashMap<String, String>();
            options.put(CoreOptions.PARTITION_MARK_DONE_ACTION.key(), CoreOptions.PartitionMarkDoneAction.SUCCESS_FILE + "," + CoreOptions.PartitionMarkDoneAction.HTTP_REPORT);
            options.put(CoreOptions.PARTITION_MARK_DONE_ACTION_URL.key(), server.getBaseUrl());
            FileStoreTable table = this.prepareTable(hasPk, options);
            String expectResponse = "{\"result\":\"success\"}";
            server.enqueueResponse(expectResponse, Integer.valueOf(200));
            server.enqueueResponse(expectResponse, Integer.valueOf(200));
            switch (invoker) {
                case "action": {
                    this.createAction(MarkPartitionDoneAction.class, "mark_partition_done", "--warehouse", this.warehouse, "--database", this.database, "--table", this.tableName, "--partition", "partKey0=0,partKey1=1", "--partition", "partKey0=1,partKey1=0").run();
                    break;
                }
                case "procedure_indexed": {
                    this.executeSQL(String.format("CALL sys.mark_partition_done('%s.%s', 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", this.database, this.tableName));
                    break;
                }
                case "procedure_named": {
                    this.executeSQL(String.format("CALL sys.mark_partition_done(`table` => '%s.%s', partitions => 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')", this.database, this.tableName));
                    break;
                }
                default: {
                    throw new UnsupportedOperationException(invoker);
                }
            }
            Path successPath1 = new Path(table.location(), "partKey0=0/partKey1=1/_SUCCESS");
            SuccessFile successFile1 = SuccessFile.safelyFromPath((FileIO)table.fileIO(), (Path)successPath1);
            Assertions.assertThat((Object)successFile1).isNotNull();
            Path successPath2 = new Path(table.location(), "partKey0=1/partKey1=0/_SUCCESS");
            SuccessFile successFile2 = SuccessFile.safelyFromPath((FileIO)table.fileIO(), (Path)successPath2);
            Assertions.assertThat((Object)successFile2).isNotNull();
            RecordedRequest recordedRequest = server.takeRequest(10L, TimeUnit.SECONDS);
            RecordedRequest recordedRequest2 = server.takeRequest(10L, TimeUnit.SECONDS);
            MarkPartitionDoneActionITCase.assertRequest(server, table, recordedRequest, "partKey0=0/partKey1=1/");
            MarkPartitionDoneActionITCase.assertRequest(server, table, recordedRequest2, "partKey0=1/partKey1=0/");
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            server.stop();
        }
    }

    public static void assertRequest(TestHttpWebServer server, FileStoreTable table, RecordedRequest recordedRequest, String exceptPartition) throws JsonProcessingException {
        String requestBody = recordedRequest.getBody().readUtf8();
        HttpReportMarkDoneAction.HttpReportMarkDoneRequest request = (HttpReportMarkDoneAction.HttpReportMarkDoneRequest)server.readRequestBody(requestBody, HttpReportMarkDoneAction.HttpReportMarkDoneRequest.class);
        Assertions.assertThat((request.getPath().equals(table.location().toString()) && request.getPartition().equals(exceptPartition) && request.getTable().equals(table.fullName()) ? 1 : 0) != 0).isTrue();
    }

    private FileStoreTable prepareTable(boolean hasPk) throws Exception {
        return this.prepareTable(hasPk, Collections.emptyMap());
    }

    private FileStoreTable prepareTable(boolean hasPk, Map<String, String> options) throws Exception {
        FileStoreTable table = this.createFileStoreTable(ROW_TYPE, Arrays.asList("partKey0", "partKey1"), hasPk ? Arrays.asList("partKey0", "partKey1", "dt") : Collections.emptyList(), hasPk ? Collections.emptyList() : Collections.singletonList("dt"), options);
        SnapshotManager snapshotManager = table.snapshotManager();
        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.commitUser);
        this.write = streamWriteBuilder.newWrite();
        this.commit = streamWriteBuilder.newCommit();
        this.writeData(this.rowData(0, 0, BinaryString.fromString((String)"2023-01-12"), 101), this.rowData(0, 0, BinaryString.fromString((String)"2023-01-12"), 102), this.rowData(0, 0, BinaryString.fromString((String)"2023-01-13"), 103));
        this.writeData(this.rowData(0, 1, BinaryString.fromString((String)"2023-01-14"), 110), this.rowData(0, 1, BinaryString.fromString((String)"2023-01-15"), 120), this.rowData(0, 1, BinaryString.fromString((String)"2023-01-16"), 130));
        this.writeData(this.rowData(1, 0, BinaryString.fromString((String)"2023-01-17"), 2), this.rowData(1, 0, BinaryString.fromString((String)"2023-01-17"), 3), this.rowData(1, 0, BinaryString.fromString((String)"2023-01-17"), 5));
        this.writeData(this.rowData(1, 1, BinaryString.fromString((String)"2023-01-18"), 82), this.rowData(1, 1, BinaryString.fromString((String)"2023-01-19"), 90), this.rowData(1, 1, BinaryString.fromString((String)"2023-01-20"), 97));
        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assertions.assertThat((long)snapshot.id()).isEqualTo(4L);
        Assertions.assertThat((Comparable)snapshot.commitKind()).isEqualTo((Object)Snapshot.CommitKind.APPEND);
        return table;
    }
}

