/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.util;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.utils.EncodingUtils;
import org.apache.hudi.adapter.Utils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTableFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class HoodiePipeline {
    private static final Logger LOG = LogManager.getLogger(HoodiePipeline.class);

    public static Builder builder(String tableName) {
        return new Builder(tableName);
    }

    private static String getCreateHoodieTableDDL(String tableName, List<String> fields, Map<String, String> options, String pkField, List<String> partitionField) {
        StringBuilder builder = new StringBuilder();
        builder.append("create table ").append(tableName).append("(\n");
        for (String field : fields) {
            builder.append("  ").append(field).append(",\n");
        }
        builder.append("  PRIMARY KEY(").append(pkField).append(") NOT ENFORCED\n").append(")\n");
        if (!partitionField.isEmpty()) {
            String partitons = partitionField.stream().map(partitionName -> "`" + partitionName + "`").collect(Collectors.joining(","));
            builder.append("PARTITIONED BY (").append(partitons).append(")\n");
        }
        builder.append("with ('connector' = 'hudi'");
        options.forEach((k, v) -> builder.append(",\n").append("  '").append((String)k).append("' = '").append((String)v).append("'"));
        builder.append("\n)");
        return builder.toString();
    }

    private static DataStreamSink<?> sink(DataStream<RowData> input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBounded) {
        FactoryUtil.DefaultDynamicTableContext context = Utils.getTableContext(tablePath, catalogTable, (ReadableConfig)Configuration.fromMap((Map)catalogTable.getOptions()));
        HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
        return ((DataStreamSinkProvider)hoodieTableFactory.createDynamicTableSink((DynamicTableFactory.Context)context).getSinkRuntimeProvider((DynamicTableSink.Context)new SinkRuntimeProviderContext(isBounded))).consumeDataStream(input);
    }

    private static DataStream<RowData> source(StreamExecutionEnvironment execEnv, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable) {
        FactoryUtil.DefaultDynamicTableContext context = Utils.getTableContext(tablePath, catalogTable, (ReadableConfig)Configuration.fromMap((Map)catalogTable.getOptions()));
        HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
        DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider)((ScanTableSource)hoodieTableFactory.createDynamicTableSource((DynamicTableFactory.Context)context)).getScanRuntimeProvider((ScanTableSource.ScanContext)new ScanRuntimeProviderContext());
        return dataStreamScanProvider.produceDataStream(execEnv);
    }

    public static class TableDescriptor {
        private final ObjectIdentifier tableId;
        private final ResolvedCatalogTable resolvedCatalogTable;

        public TableDescriptor(ObjectIdentifier tableId, ResolvedCatalogTable resolvedCatalogTable) {
            this.tableId = tableId;
            this.resolvedCatalogTable = resolvedCatalogTable;
        }

        public ObjectIdentifier getTableId() {
            return this.tableId;
        }

        public ResolvedCatalogTable getResolvedCatalogTable() {
            return this.resolvedCatalogTable;
        }
    }

    public static class Builder {
        private final String tableName;
        private final List<String> columns;
        private final Map<String, String> options;
        private String pk;
        private List<String> partitions;

        private Builder(String tableName) {
            this.tableName = tableName;
            this.columns = new ArrayList<String>();
            this.options = new HashMap<String, String>();
            this.partitions = new ArrayList<String>();
        }

        public Builder column(String column) {
            this.columns.add(column);
            return this;
        }

        public Builder pk(String ... pks) {
            this.pk = String.join((CharSequence)",", pks);
            return this;
        }

        public Builder partition(String ... partitions) {
            this.partitions = new ArrayList<String>(Arrays.asList(partitions));
            return this;
        }

        public Builder schema(Schema schema) {
            for (Schema.UnresolvedColumn column : schema.getColumns()) {
                this.column(column.toString());
            }
            if (schema.getPrimaryKey().isPresent()) {
                this.pk(((Schema.UnresolvedPrimaryKey)schema.getPrimaryKey().get()).getColumnNames().stream().map(EncodingUtils::escapeIdentifier).collect(Collectors.joining(", ")));
            }
            return this;
        }

        public Builder option(ConfigOption<?> option, Object val) {
            this.options.put(option.key(), val.toString());
            return this;
        }

        public Builder option(String key, Object val) {
            this.options.put(key, val.toString());
            return this;
        }

        public Builder options(Map<String, String> options) {
            this.options.putAll(options);
            return this;
        }

        public DataStreamSink<?> sink(DataStream<RowData> input, boolean bounded) {
            TableDescriptor tableDescriptor = this.getTableDescriptor();
            return HoodiePipeline.sink((DataStream<RowData>)input, tableDescriptor.getTableId(), tableDescriptor.getResolvedCatalogTable(), bounded);
        }

        public TableDescriptor getTableDescriptor() {
            EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().build();
            TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create((EnvironmentSettings)environmentSettings);
            String sql = HoodiePipeline.getCreateHoodieTableDDL(this.tableName, this.columns, this.options, this.pk, this.partitions);
            tableEnv.executeSql(sql);
            String currentCatalog = tableEnv.getCurrentCatalog();
            ResolvedCatalogTable catalogTable = null;
            String defaultDatabase = null;
            try {
                Catalog catalog = (Catalog)tableEnv.getCatalog(currentCatalog).get();
                defaultDatabase = catalog.getDefaultDatabase();
                catalogTable = (ResolvedCatalogTable)catalog.getTable(new ObjectPath(defaultDatabase, this.tableName));
            }
            catch (TableNotExistException e) {
                throw new HoodieException("Create table " + this.tableName + " exception", e);
            }
            ObjectIdentifier tableId = ObjectIdentifier.of((String)currentCatalog, (String)defaultDatabase, (String)this.tableName);
            return new TableDescriptor(tableId, catalogTable);
        }

        public DataStream<RowData> source(StreamExecutionEnvironment execEnv) {
            TableDescriptor tableDescriptor = this.getTableDescriptor();
            return HoodiePipeline.source(execEnv, tableDescriptor.tableId, tableDescriptor.getResolvedCatalogTable());
        }
    }
}

