/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources.debezium;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource;
import org.apache.hudi.utilities.sources.debezium.TestAbstractDebeziumSource;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;

public class TestPostgresDebeziumSource
extends TestAbstractDebeziumSource {
    private static final String POSTGRES_GITHUB_SCHEMA = "{\"connect.name\": \"postgres.ghschema.gharchive.Envelope\",\n  \"fields\": [{\"default\": null,\"name\": \"before\",\"type\": [\"null\",{\"connect.name\": \"postgres.ghschema.gharchive.Value\",\n  \"fields\": [{\"name\": \"id\",\"type\": \"string\"},{\"name\": \"date\",\"type\": \"string\"},{\"default\": null,\"name\": \"timestamp\",\n  \"type\": [\"null\",\"long\"]},{\"default\": null,\"name\": \"type\",\"type\": [\"null\",\"string\"]},{\"default\": null,\"name\": \"payload\",\n  \"type\": [\"null\",\"string\"]},{\"default\": null,\"name\": \"org\",\"type\": [\"null\",\"string\"]},{\"default\": null,\"name\": \"created_at\",\n  \"type\": [\"null\",\"long\"]},{\"default\": null,\"name\": \"public\",\"type\": [\"null\",\"boolean\"]}],\"name\": \"Value\",\"type\": \"record\"\n  }]},{\"default\": null,\"name\": \"after\",\"type\": [\"null\",\"Value\"]},{\"name\": \"source\",\"type\": {\"connect.name\": \"io.debezium.connector.postgresql.Source\",\n  \"fields\": [{\"name\": \"connector\",\"type\": \"string\"},{\"name\": \"name\",\"type\": \"string\"},{\"name\": \"ts_ms\",\"type\": \"long\"},\n  {\"name\": \"db\",\"type\": \"string\"},{\"name\": \"schema\",\"type\": \"string\"},{\"name\": \"table\",\"type\": \"string\"},{\"default\": null,\n  \"name\": \"txId\",\"type\": [\"null\",\"long\"]},{\"default\": null,\"name\": \"lsn\",\"type\": [\"null\",\"long\"]},{\"default\": null,\n  \"name\": \"xmin\",\"type\": [\"null\",\"long\"]}],\"name\": \"Source\",\"namespace\": \"io.debezium.connector.postgresql\",\"type\": \"record\"\n  }},{\"name\": \"op\",\"type\": \"string\"},{\"default\": null,\"name\": \"ts_ms\",\"type\": [\"null\",\"long\"]},{\"default\": null,\"name\": \"transaction\",\n  \"type\": [\"null\",{\"fields\": [{\"name\": \"id\",\"type\": \"string\"},{\"name\": \"total_order\",\"type\": \"long\"},{\"name\": \"data_collection_order\",\n  \"type\": \"long\"}],\"name\": \"ConnectDefault\",\"namespace\": \"io.confluent.connect.avro\",\"type\": \"record\"}]}],\"name\": \"Envelope\",\n  \"namespace\": \"postgres.ghschema.gharchive\",\"type\": \"record\"}";
    private static final String TEST_DB = "postgres";
    private static final String TEST_SCHEMA = "ghschema";
    private static final String TEST_TABLE = "gharchive";
    private static final long TEST_TS_MS = 12345L;
    private static final long TEST_TXID = 543L;
    private static final long TEST_LSN = 98765L;

    @Override
    protected String getIndexName() {
        return TEST_DB;
    }

    @Override
    protected String getSourceClass() {
        return PostgresDebeziumSource.class.getName();
    }

    @Override
    protected String getSchema() {
        return POSTGRES_GITHUB_SCHEMA;
    }

    @Override
    protected GenericRecord generateMetaFields(GenericRecord rec) {
        Schema schema = new Schema.Parser().parse(this.getSchema());
        GenericData.Record sourceRecord = new GenericData.Record(schema.getField("source").schema());
        sourceRecord.put("name", (Object)this.getIndexName());
        sourceRecord.put("connector", (Object)this.getIndexName());
        sourceRecord.put("db", (Object)TEST_DB);
        sourceRecord.put("schema", (Object)TEST_SCHEMA);
        sourceRecord.put("table", (Object)TEST_TABLE);
        sourceRecord.put("ts_ms", (Object)12345L);
        sourceRecord.put("txId", (Object)543L);
        sourceRecord.put("lsn", (Object)98765L);
        rec.put("source", (Object)sourceRecord);
        return rec;
    }

    @Override
    protected void validateMetaFields(Dataset<Row> records) {
        Assertions.assertTrue((boolean)records.select("_event_origin_ts_ms", new String[0]).collectAsList().stream().allMatch(r -> r.getLong(0) == 12345L));
        Assertions.assertTrue((boolean)records.select("_event_tx_id", new String[0]).collectAsList().stream().allMatch(r -> r.getLong(0) == 543L));
        Assertions.assertTrue((boolean)records.select("_event_lsn", new String[0]).collectAsList().stream().allMatch(r -> r.getLong(0) == 98765L));
    }
}

