/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.examples;

import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.response.Response;
import io.restassured.response.ValidatableResponse;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.test.quarkus.kafka.KafkaTestClient;

@QuarkusTest
public class OutboxIT {
    private static final Duration INITIAL_TIMEOUT = Duration.ofSeconds(250L);
    private static final Duration TIMEOUT = Duration.ofSeconds(25L);
    private static final Duration INTERVAL = Duration.ofSeconds(5L);
    private static final String PROCESS_EVENTS_TOPIC = "kogito-processinstances-events";
    private static final String USERTASK_EVENTS_TOPIC = "kogito-usertaskinstances-events";
    @ConfigProperty(name="kogito.port")
    private int kogitoPort;
    @ConfigProperty(name="debezium.port")
    private int debeziumPort;
    @ConfigProperty(name="kafka.port")
    private int kafkaPort;
    private KafkaTestClient kafkaClient;

    @BeforeEach
    void setup() {
        this.kafkaClient = new KafkaTestClient("localhost:" + this.kafkaPort);
    }

    @AfterEach
    void close() {
        if (this.kafkaClient != null) {
            this.kafkaClient.shutdown();
        }
    }

    @Test
    public void testSendProcessEvents() {
        Awaitility.given().ignoreExceptions().await().atMost(INITIAL_TIMEOUT).with().pollInterval(INTERVAL).until(() -> {
            ((ValidatableResponse)((Response)RestAssured.given().port(this.kogitoPort).when().get("/orders", new Object[0])).then()).statusCode(200);
            return true;
        });
        Awaitility.given().ignoreExceptions().await().atMost(INITIAL_TIMEOUT).with().pollInterval(INTERVAL).until(() -> {
            ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().port(this.debeziumPort).pathParam("connector", (Object)"kogito-connector").pathParam("task", (Object)0).when().get("/connectors/{connector}/tasks/{task}/status", new Object[0])).then()).statusCode(200)).assertThat()).body("state", Matchers.equalTo((Object)"RUNNING"), new Object[0]);
            return true;
        });
        Awaitility.given().ignoreExceptions().await().atMost(TIMEOUT).with().pollInterval(INTERVAL).until(() -> {
            ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().port(this.debeziumPort).pathParam("connector", (Object)"kogito-connector").when().get("/connectors/{connector}/topics", new Object[0])).then()).statusCode(200)).assertThat()).body("kogito-connector.topics", Matchers.hasSize((int)0), new Object[0]);
            return true;
        });
        Awaitility.given().ignoreExceptions().await().atMost(TIMEOUT).with().pollInterval(INTERVAL).until(() -> {
            ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().port(this.kogitoPort).header("Content-Type", (Object)"application/json", new Object[0]).body("{\"approver\" : \"john\", \"order\" : {\"orderNumber\" : \"23570\", \"shipped\" : false}}").when().post("/orders", new Object[0])).then()).statusCode(201)).assertThat()).body("approver", Matchers.equalTo((Object)"john"), new Object[0])).body("order.orderNumber", Matchers.equalTo((Object)"23570"), new Object[0])).body("order.shipped", Matchers.equalTo((Object)false), new Object[0]);
            return true;
        });
        Awaitility.given().ignoreExceptions().await().atMost(TIMEOUT).with().pollInterval(INTERVAL).until(() -> {
            ((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((ValidatableResponse)((Response)RestAssured.given().port(this.debeziumPort).pathParam("connector", (Object)"kogito-connector").when().get("/connectors/{connector}/topics", new Object[0])).then()).statusCode(200)).assertThat()).body("kogito-connector.topics", Matchers.hasSize((int)2), new Object[0])).body("kogito-connector.topics", Matchers.hasItem((Object)PROCESS_EVENTS_TOPIC), new Object[0])).body("kogito-connector.topics", Matchers.hasItem((Object)USERTASK_EVENTS_TOPIC), new Object[0]);
            return true;
        });
        AtomicInteger processEventCounter = new AtomicInteger(0);
        this.kafkaClient.consume(PROCESS_EVENTS_TOPIC, message -> {
            String orderNumber = (String)JsonPath.read((String)message, (String)"$.data.variables.order.orderNumber", (Predicate[])new Predicate[0]);
            boolean shipped = (Boolean)JsonPath.read((String)message, (String)"$.data.variables.order.shipped", (Predicate[])new Predicate[0]);
            if ("23570".equals(orderNumber) && !shipped) {
                processEventCounter.incrementAndGet();
            }
        });
        Awaitility.given().ignoreExceptions().await().atMost(TIMEOUT).with().pollInterval(INTERVAL).until(() -> processEventCounter.intValue() == 2);
        AtomicInteger usertaskEventCounter = new AtomicInteger(0);
        this.kafkaClient.consume(USERTASK_EVENTS_TOPIC, message -> {
            String orderNumber = (String)JsonPath.read((String)message, (String)"$.data.inputs.input1.orderNumber", (Predicate[])new Predicate[0]);
            boolean shipped = (Boolean)JsonPath.read((String)message, (String)"$.data.inputs.input1.shipped", (Predicate[])new Predicate[0]);
            if ("23570".equals(orderNumber) && !shipped) {
                usertaskEventCounter.incrementAndGet();
            }
        });
        Awaitility.given().ignoreExceptions().await().atMost(TIMEOUT).with().pollInterval(INTERVAL).until(() -> usertaskEventCounter.intValue() == 1);
    }
}

