/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.lineage.DataLineageEntity;
import org.apache.paimon.lineage.LineageMeta;
import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.lineage.TableLineageEntity;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.predicate.Predicate;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class FlinkLineageITCase
extends CatalogITCaseBase {
    private static final String THROWING_META = "throwing-meta";
    private static final Map<String, Map<String, TableLineageEntity>> jobSourceTableLineages = new HashMap<String, Map<String, TableLineageEntity>>();
    private static final Map<String, Map<String, TableLineageEntity>> jobSinkTableLineages = new HashMap<String, Map<String, TableLineageEntity>>();

    @Override
    protected List<String> ddl() {
        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT, b INT, c INT)");
    }

    @Override
    protected Map<String, String> catalogOptions() {
        return Collections.singletonMap(CatalogOptions.LINEAGE_META.key(), THROWING_META);
    }

    @Test
    public void testTableLineage() throws Exception {
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("INSERT INTO T VALUES (1, 2, 3),(4, 5, 6);").await()).hasCauseExactlyInstanceOf(ValidationException.class).hasRootCauseMessage("Cannot get pipeline name for lineage meta.");
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("SELECT * FROM T").collect().close()).hasCauseExactlyInstanceOf(ValidationException.class).hasRootCauseMessage("Cannot get pipeline name for lineage meta.");
        this.tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, (Object)"insert_t_job");
        this.tEnv.executeSql("INSERT INTO T VALUES (1, 2, 3),(4, 5, 6);").await();
        Assertions.assertThat(jobSinkTableLineages).isNotEmpty();
        TableLineageEntity sinkTableLineage = jobSinkTableLineages.get("insert_t_job").get("default.T.insert_t_job");
        Assertions.assertThat((String)sinkTableLineage.getTable()).isEqualTo("T");
        ArrayList<Object> sinkTableRows = new ArrayList<Object>();
        try (CloseableIterator iterator = this.tEnv.executeSql("SELECT * FROM sys.sink_table_lineage").collect();){
            while (iterator.hasNext()) {
                sinkTableRows.add(iterator.next());
            }
        }
        Assertions.assertThat((int)sinkTableRows.size()).isEqualTo(1);
        Row sinkTableRow = (Row)sinkTableRows.get(0);
        Assertions.assertThat((Object)sinkTableRow.getField("database_name")).isEqualTo((Object)"default");
        Assertions.assertThat((Object)sinkTableRow.getField("table_name")).isEqualTo((Object)"T");
        Assertions.assertThat((Object)sinkTableRow.getField("job_name")).isEqualTo((Object)"insert_t_job");
        this.tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, (Object)"select_t_job");
        this.tEnv.executeSql("SELECT * FROM T").collect().close();
        Assertions.assertThat(jobSourceTableLineages).isNotEmpty();
        TableLineageEntity sourceTableLineage = jobSourceTableLineages.get("select_t_job").get("default.T.select_t_job");
        Assertions.assertThat((String)sourceTableLineage.getTable()).isEqualTo("T");
        ArrayList<Object> sourceTableRows = new ArrayList<Object>();
        try (CloseableIterator iterator = this.tEnv.executeSql("SELECT * FROM sys.source_table_lineage").collect();){
            while (iterator.hasNext()) {
                sourceTableRows.add(iterator.next());
            }
        }
        Assertions.assertThat((int)sourceTableRows.size()).isEqualTo(1);
        Row sourceTableRow = (Row)sourceTableRows.get(0);
        Assertions.assertThat((Object)sourceTableRow.getField("database_name")).isEqualTo((Object)"default");
        Assertions.assertThat((Object)sourceTableRow.getField("table_name")).isEqualTo((Object)"T");
        Assertions.assertThat((Object)sourceTableRow.getField("job_name")).isEqualTo((Object)"select_t_job");
    }

    private static String getTableLineageKey(TableLineageEntity entity) {
        return String.format("%s.%s.%s", entity.getDatabase(), entity.getTable(), entity.getJob());
    }

    private static class TestingMemoryLineageMeta
    implements LineageMeta {
        private TestingMemoryLineageMeta() {
        }

        public void saveSourceTableLineage(TableLineageEntity entity) {
            jobSourceTableLineages.computeIfAbsent(entity.getJob(), key -> new HashMap()).put(FlinkLineageITCase.getTableLineageKey(entity), entity);
        }

        public void deleteSourceTableLineage(String job) {
            jobSourceTableLineages.remove(job);
        }

        public Iterator<TableLineageEntity> sourceTableLineages(@Nullable Predicate predicate) {
            return jobSourceTableLineages.values().stream().flatMap(v -> v.values().stream()).iterator();
        }

        public void saveSinkTableLineage(TableLineageEntity entity) {
            Assertions.assertThat((String)entity.getJob()).isEqualTo("insert_t_job");
            Assertions.assertThat((String)entity.getTable()).isEqualTo("T");
            Assertions.assertThat((String)entity.getDatabase()).isEqualTo("default");
            jobSinkTableLineages.computeIfAbsent(entity.getJob(), key -> new HashMap()).put(FlinkLineageITCase.getTableLineageKey(entity), entity);
        }

        public Iterator<TableLineageEntity> sinkTableLineages(@Nullable Predicate predicate) {
            return jobSinkTableLineages.values().stream().flatMap(v -> v.values().stream()).iterator();
        }

        public void deleteSinkTableLineage(String job) {
            jobSinkTableLineages.remove(job);
        }

        public void saveSourceDataLineage(DataLineageEntity entity) {
            Assertions.assertThat((String)entity.getJob()).isEqualTo("select_t_job");
            Assertions.assertThat((String)entity.getTable()).isEqualTo("T");
            Assertions.assertThat((String)entity.getDatabase()).isEqualTo("default");
            throw new UnsupportedOperationException("Method saveSinkTableLineage is not supported");
        }

        public Iterator<DataLineageEntity> sourceDataLineages(@Nullable Predicate predicate) {
            throw new UnsupportedOperationException();
        }

        public void saveSinkDataLineage(DataLineageEntity entity) {
            throw new UnsupportedOperationException();
        }

        public Iterator<DataLineageEntity> sinkDataLineages(@Nullable Predicate predicate) {
            throw new UnsupportedOperationException();
        }

        public void close() throws Exception {
        }
    }

    public static class TestingMemoryLineageMetaFactory
    implements LineageMetaFactory {
        private static final long serialVersionUID = 1L;

        public String identifier() {
            return FlinkLineageITCase.THROWING_META;
        }

        public LineageMeta create(LineageMetaFactory.LineageMetaContext context) {
            return new TestingMemoryLineageMeta();
        }
    }
}

