/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.function.Function;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.types.Row;
import org.apache.paimon.flink.kafka.KafkaTableTestBase;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class StreamingWarehouseITCase
extends KafkaTableTestBase {
    private static final Function<Row, CleanedTradeOrder> ORDER_CONVERTER = row -> {
        assert (row != null && row.getArity() == 9);
        CleanedTradeOrder order = new CleanedTradeOrder();
        order.orderId = (Long)row.getField(0);
        order.orderTimestamp = (LocalDateTime)row.getField(1);
        order.buyerId = (String)row.getField(2);
        order.orderAmount = (Double)row.getField(3);
        order.loyaltyDiscount = (Double)row.getField(4);
        order.shippingFee = (Double)row.getField(5);
        order.orderVerified = (Boolean)row.getField(6);
        order.actualGmv = (Double)row.getField(7);
        order.dt = (String)row.getField(8);
        return order;
    };

    @Test
    public void testUserStory() throws Exception {
        ReadWriteTableTestUtil.init((String)this.createAndRegisterTempFile("").toString(), (int)1);
        String orderSource = "CREATE TEMPORARY TABLE IF NOT EXISTS trade_orders (\n    order_id BIGINT NOT NULL,\n    order_timestamp AS LOCALTIMESTAMP,\n    buyer_id STRING,\n    order_amount DOUBLE,\n    loyalty_discount DOUBLE,\n    shipping_fee DOUBLE,\n    order_verified BOOLEAN,\n    PRIMARY KEY (order_id) NOT ENFORCED\n  )\nWITH (\n    'connector' = 'datagen',\n    'rows-per-second' = '10',\n    'fields.order_id.kind' = 'random',\n    'fields.order_id.min' = '1',\n    'fields.buyer_id.kind' = 'random',\n    'fields.buyer_id.length' = '3',\n    'fields.order_amount.min' = '10',\n    'fields.order_amount.max' = '1000',\n    'fields.loyalty_discount.min' = '0',\n    'fields.loyalty_discount.max' = '10',\n    'fields.shipping_fee.min' = '5',\n    'fields.shipping_fee.max' = '20'\n  );";
        String cleanedOrders = String.format("CREATE TABLE IF NOT EXISTS cleaned_trade_order (\n    order_id BIGINT NOT NULL,\n    order_timestamp TIMESTAMP (3),\n    buyer_id STRING,\n    order_amount DOUBLE,\n    loyalty_discount DOUBLE,\n    shipping_fee DOUBLE,\n    order_verified BOOLEAN,\n    actual_gmv DOUBLE,\n    dt STRING,\n    PRIMARY KEY (dt, order_id) NOT ENFORCED\n  )\nPARTITIONED BY (dt)\nWITH (\n    'bucket' = '1',\n    'log.system' = 'kafka',\n    'kafka.bootstrap.servers' = '%s',\n    'kafka.topic' = 'cleaned_trade_order');", StreamingWarehouseITCase.getBootstrapServers());
        ReadWriteTableTestUtil.sEnv.executeSql(orderSource);
        ReadWriteTableTestUtil.bEnv.executeSql(orderSource);
        ReadWriteTableTestUtil.sEnv.executeSql(cleanedOrders);
        String corruptedHistoricalData = "INSERT INTO cleaned_trade_order\nPARTITION (dt = '2022-04-14')\nSELECT order_id,\n  TIMESTAMPADD (\n    HOUR,\n    RAND_INTEGER (24),\n    TO_TIMESTAMP ('2022-04-14', 'yyyy-MM-dd')\n  ) AS order_timestamp,\n  IF (\n    order_verified\n    AND order_id % 2 = 1,\n    '404NotFound',\n    buyer_id\n  ) AS buyer_id,\n  order_amount,\n  loyalty_discount,\n  shipping_fee,\n  order_verified,\n  IF (\n    order_verified\n    AND order_id % 2 = 1,\n    -1,\n    order_amount + shipping_fee - loyalty_discount\n  ) AS actual_gmv\nFROM\n  trade_orders\n  /*+ OPTIONS ('number-of-rows' = '50')  */";
        ReadWriteTableTestUtil.bEnv.executeSql(corruptedHistoricalData).await();
        String streamingRead = "SELECT * FROM cleaned_trade_order";
        BlockingIterator streamIter = BlockingIterator.of((Iterator)ReadWriteTableTestUtil.sEnv.executeSql(streamingRead).collect(), ORDER_CONVERTER);
        streamIter.collect(50).stream().filter(order -> order.orderVerified != false && order.orderId % 2L == 1L).forEach(order -> {
            Assertions.assertThat((String)order.buyerId).isEqualTo("404NotFound");
            Assertions.assertThat((Double)order.actualGmv).isEqualTo(-1.0);
            Assertions.assertThat((String)order.dt).isEqualTo("2022-04-14");
        });
        String streamingWrite = "INSERT INTO cleaned_trade_order\nSELECT order_id,\n  order_timestamp,\n  buyer_id,\n  order_amount,\n  loyalty_discount,\n  shipping_fee,\n  order_verified,\n  order_amount + shipping_fee - loyalty_discount AS actual_gmv,\n  DATE_FORMAT (order_timestamp, 'yyyy-MM-dd') AS dt\nFROM\n  trade_orders";
        JobClient dailyTaskHandler = (JobClient)ReadWriteTableTestUtil.sEnv.executeSql(streamingWrite).getJobClient().get();
        while (dailyTaskHandler.getJobStatus().get() != JobStatus.RUNNING) {
        }
        String backFillOverwrite = "INSERT OVERWRITE cleaned_trade_order\nSELECT order_id,\n  order_timestamp,\n  IF (buyer_id = '404NotFound', '_ANONYMOUS_USER_', buyer_id) AS buyer_id,\n  order_amount,\n  loyalty_discount,\n  shipping_fee,\n  order_verified,\n  IF (\n    actual_gmv = -1,\n    order_amount + shipping_fee - loyalty_discount,\n    actual_gmv\n  ) AS actual_gmv,\n  dt\nFROM\n  cleaned_trade_order\nWHERE\n  dt = '2022-04-14';";
        ReadWriteTableTestUtil.bEnv.executeSql(backFillOverwrite).await();
        for (int checkSize = 200; checkSize > 0; checkSize -= 10) {
            Thread.sleep(1000L);
            streamIter.collect(10).forEach(order -> Assertions.assertThat((String)order.dt).isGreaterThan("2022-04-14"));
        }
        BlockingIterator batchIter = BlockingIterator.of((Iterator)ReadWriteTableTestUtil.bEnv.executeSql("SELECT * FROM cleaned_trade_order WHERE dt ='2022-04-14'").collect(), ORDER_CONVERTER);
        batchIter.collect(50).stream().filter(order -> order.orderVerified != false && order.orderId % 2L == 1L).forEach(order -> {
            Assertions.assertThat((String)order.buyerId).isEqualTo("_ANONYMOUS_USER_");
            Assertions.assertThat((Double)order.actualGmv).isEqualTo(order.orderAmount + order.shippingFee - order.loyaltyDiscount);
            Assertions.assertThat((String)order.dt).isEqualTo("2022-04-14");
        });
        streamIter.close();
        dailyTaskHandler.cancel().get();
    }

    private static class CleanedTradeOrder {
        protected Long orderId;
        protected LocalDateTime orderTimestamp;
        protected String buyerId;
        protected Double orderAmount;
        protected Double loyaltyDiscount;
        protected Double shippingFee;
        protected Boolean orderVerified;
        protected Double actualGmv;
        protected String dt;

        private CleanedTradeOrder() {
        }
    }
}

