/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.kafka.KafkaLogOptions;
import org.apache.paimon.flink.kafka.KafkaLogStoreFactory;
import org.apache.paimon.flink.kafka.KafkaTableTestBase;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.types.RowKind;

public class KafkaLogTestUtils {
    public static final DynamicTableSource.Context SOURCE_CONTEXT = new DynamicTableSource.Context(){

        public <T> TypeInformation<T> createTypeInformation(DataType producedDataType) {
            return this.createTypeInformation(TypeConversions.fromDataToLogicalType((DataType)producedDataType));
        }

        public <T> TypeInformation<T> createTypeInformation(LogicalType producedLogicalType) {
            return InternalTypeInfo.of((LogicalType)producedLogicalType);
        }

        public DynamicTableSource.DataStructureConverter createDataStructureConverter(DataType producedDataType) {
            return ScanRuntimeProviderContext.INSTANCE.createDataStructureConverter(producedDataType);
        }
    };
    public static final DynamicTableSink.Context SINK_CONTEXT = new DynamicTableSink.Context(){

        public boolean isBounded() {
            return false;
        }

        public <T> TypeInformation<T> createTypeInformation(DataType producedDataType) {
            return this.createTypeInformation(TypeConversions.fromDataToLogicalType((DataType)producedDataType));
        }

        public <T> TypeInformation<T> createTypeInformation(LogicalType producedLogicalType) {
            return InternalTypeInfo.of((LogicalType)producedLogicalType);
        }

        public DynamicTableSink.DataStructureConverter createDataStructureConverter(DataType producedDataType) {
            return new SinkRuntimeProviderContext(this.isBounded()).createDataStructureConverter(producedDataType);
        }

        public Optional<int[][]> getTargetColumns() {
            return Optional.empty();
        }
    };

    public static KafkaLogStoreFactory discoverKafkaLogFactory() {
        return (KafkaLogStoreFactory)LogStoreTableFactory.discoverLogStoreFactory((ClassLoader)Thread.currentThread().getContextClassLoader(), (String)"kafka");
    }

    private static DynamicTableFactory.Context createContext(String name, RowType rowType, int[] pk, Map<String, String> options) {
        return new FactoryUtil.DefaultDynamicTableContext(ObjectIdentifier.of((String)"catalog", (String)"database", (String)name), KafkaLogTestUtils.createResolvedTable(options, rowType, pk), Collections.emptyMap(), (ReadableConfig)new Configuration(), Thread.currentThread().getContextClassLoader(), false);
    }

    static ResolvedCatalogTable createResolvedTable(Map<String, String> options, RowType rowType, int[] pk) {
        List fieldNames = rowType.getFieldNames();
        List fieldDataTypes = rowType.getChildren().stream().map(TypeConversions::fromLogicalToDataType).collect(Collectors.toList());
        CatalogTable origin = CatalogTable.of((Schema)Schema.newBuilder().fromFields(fieldNames, fieldDataTypes).build(), null, Collections.emptyList(), options);
        List resolvedColumns = IntStream.range(0, fieldNames.size()).mapToObj(i -> Column.physical((String)((String)fieldNames.get(i)), (DataType)((DataType)fieldDataTypes.get(i)))).collect(Collectors.toList());
        UniqueConstraint constraint = null;
        if (pk.length > 0) {
            List pkNames = Arrays.stream(pk).mapToObj(fieldNames::get).collect(Collectors.toList());
            constraint = UniqueConstraint.primaryKey((String)"pk", pkNames);
        }
        return new ResolvedCatalogTable(origin, new ResolvedSchema(resolvedColumns, Collections.emptyList(), constraint));
    }

    public static DynamicTableFactory.Context testContext(String servers, CoreOptions.LogChangelogMode changelogMode, boolean keyed) {
        return KafkaLogTestUtils.testContext(servers, changelogMode, keyed, Collections.emptyMap());
    }

    public static DynamicTableFactory.Context testContext(String servers, CoreOptions.LogChangelogMode changelogMode, boolean keyed, Map<String, String> dynamicOptions) {
        return KafkaLogTestUtils.testContext("table", servers, changelogMode, CoreOptions.LogConsistency.TRANSACTIONAL, keyed, dynamicOptions);
    }

    static DynamicTableFactory.Context testContext(String name, String servers, CoreOptions.LogChangelogMode changelogMode, CoreOptions.LogConsistency consistency, boolean keyed, Map<String, String> dynamicOptions) {
        int[] nArray;
        RowType rowType = RowType.of((LogicalType[])new LogicalType[]{new IntType(), new IntType()});
        if (keyed) {
            int[] nArray2 = new int[1];
            nArray = nArray2;
            nArray2[0] = 0;
        } else {
            nArray = new int[]{};
        }
        return KafkaLogTestUtils.testContext(name, servers, changelogMode, consistency, rowType, nArray, dynamicOptions);
    }

    public static DynamicTableFactory.Context testContext(String name, String servers, CoreOptions.LogChangelogMode changelogMode, CoreOptions.LogConsistency consistency, RowType type, int[] keys, Map<String, String> dynamicOptions) {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(CoreOptions.LOG_CHANGELOG_MODE.key(), changelogMode.toString());
        options.put(CoreOptions.LOG_CONSISTENCY.key(), consistency.toString());
        options.put(KafkaLogOptions.BOOTSTRAP_SERVERS.key(), servers);
        options.put(KafkaLogOptions.TOPIC.key(), UUID.randomUUID().toString());
        options.putAll(dynamicOptions);
        return KafkaLogTestUtils.createContext(name, type, keys, options);
    }

    static SinkRecord testRecord(boolean hasPk, int bucket, int pk, int value, RowKind rowKind) {
        return new SinkRecord(BinaryRow.EMPTY_ROW, bucket, hasPk ? MergeTreeCompactManagerTest.row((int)pk) : BinaryRow.EMPTY_ROW, (InternalRow)GenericRow.ofKind((RowKind)rowKind, (Object[])new Object[]{pk, value}));
    }

    static String createTableWithKafkaLog(List<String> fieldsSpec, List<String> primaryKeys, List<String> partitionKeys, boolean manuallyCreateLogTable) {
        final String topic = "topic_" + UUID.randomUUID();
        ArrayList<String> bucketKeys = new ArrayList<String>();
        if (primaryKeys.isEmpty()) {
            for (String fieldSpec : fieldsSpec) {
                String fieldName = fieldSpec.split(" ")[0];
                if (partitionKeys.contains(fieldName) || "WATERMARK".equalsIgnoreCase(fieldName) || fieldSpec.contains(" AS ")) continue;
                bucketKeys.add(fieldName);
            }
        }
        String table = ReadWriteTableTestUtil.createTable(fieldsSpec, primaryKeys, bucketKeys, partitionKeys, (Map)new HashMap<String, String>(){
            {
                this.put(FlinkConnectorOptions.LOG_SYSTEM.key(), "kafka");
                this.put(KafkaLogOptions.BOOTSTRAP_SERVERS.key(), KafkaTableTestBase.getBootstrapServers());
                this.put(KafkaLogOptions.TOPIC.key(), topic);
                this.put(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false");
            }
        });
        if (manuallyCreateLogTable) {
            KafkaTableTestBase.createTopicIfNotExists(topic, 1);
        }
        return table;
    }
}

