/*
 * Decompiled with CFR 0.152.
 */
package com.d3x.morpheus.db;

import com.d3x.morpheus.db.DbSinkOptions;
import com.d3x.morpheus.frame.DataFrame;
import com.d3x.morpheus.frame.DataFrameCursor;
import com.d3x.morpheus.frame.DataFrameException;
import com.d3x.morpheus.frame.DataFrameRow;
import com.d3x.morpheus.frame.DataFrameValue;
import com.d3x.morpheus.util.Collect;
import com.d3x.morpheus.util.Initialiser;
import com.d3x.morpheus.util.functions.Function1;
import com.d3x.morpheus.util.sql.SQLPlatform;
import com.d3x.morpheus.util.sql.SQLType;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class DbSink {
    private static final Map<Class<?>, SQLType> sqlTypeMap = new HashMap();

    public <R, C> void write(DataFrame<R, C> frame, Consumer<DbSinkOptions<R, C>> configurator) {
        Objects.requireNonNull(frame, "DataFrame cannot be null");
        Objects.requireNonNull(configurator, "The options consumer cannot be null");
        DbSinkOptions options = (DbSinkOptions)Initialiser.apply(new DbSinkOptions(), configurator);
        try (Connection conn = options.getConnection();){
            if (!options.getPlatform().isPresent()) {
                String driverName = conn.getMetaData().getDriverName();
                SQLPlatform platform = SQLPlatform.getPlatform((String)driverName);
                options.setPlatform(platform);
            }
            this.createTable(frame, options);
            this.insertData(frame, options);
        }
        catch (Exception ex) {
            throw new DataFrameException("Failed to write DataFrame to database table " + options.getTableName(), (Throwable)ex);
        }
    }

    private <R, C> void createTable(DataFrame<R, C> frame, DbSinkOptions<R, C> options) {
        Connection conn = options.getConnection();
        String tableName = options.getTableName();
        try (Statement stmt = conn.createStatement();){
            DatabaseMetaData metaData = conn.getMetaData();
            ResultSet tables = metaData.getTables(null, null, tableName, null);
            if (tables.next()) {
                System.out.println("The table named " + tableName + " already exists");
            } else {
                String ddl = this.getCreateTableSql(frame, options);
                System.out.println("Executing DDL:\n " + ddl);
                stmt.executeUpdate(ddl);
            }
        }
        catch (Exception ex) {
            throw new DataFrameException("Failed to create table named " + tableName + " in database", (Throwable)ex);
        }
    }

    private <R, C> void insertData(DataFrame<R, C> frame, DbSinkOptions<R, C> options) {
        List<ColumnAdapter> columnList = this.getColumnAdapters(frame, options);
        String insertSql = this.getInsertSql(columnList, options);
        System.out.println("Insert SQL: " + insertSql);
        try (PreparedStatement stmt = options.getConnection().prepareStatement(insertSql);){
            int rowCount = 0;
            for (DataFrameRow row : frame.rows()) {
                for (int i = 0; i < columnList.size(); ++i) {
                    ColumnAdapter adapter = columnList.get(i);
                    int stmtIndex = i + 1;
                    adapter.apply(stmt, stmtIndex, row);
                }
                stmt.addBatch();
                if (++rowCount % options.getBatchSize() != 0) continue;
                System.out.println("Executing batch, row count is " + rowCount);
                stmt.executeBatch();
            }
            if (rowCount % options.getBatchSize() != 0) {
                System.out.println("Executing final batch, row count is " + rowCount);
                stmt.executeBatch();
            }
        }
        catch (Exception ex) {
            throw new DataFrameException("Failed to insert data from DataFrame into table named " + options.getTableName(), (Throwable)ex);
        }
    }

    private <R, C> String getInsertSql(List<ColumnAdapter> columnList, DbSinkOptions<R, C> options) {
        String tableName = options.getTableName();
        List colNames = columnList.stream().map(c -> "\"" + c.colName + "\"").collect(Collectors.toList());
        List params = IntStream.range(0, colNames.size()).mapToObj(i -> "?").collect(Collectors.toList());
        String columnsString = String.join((CharSequence)",", colNames);
        String paramsString = String.join((CharSequence)",", params);
        return String.format("INSERT INTO \"%s\" (%s) VALUES (%s)", tableName, columnsString, paramsString);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private <R, C> List<ColumnAdapter> getColumnAdapters(DataFrame<R, C> frame, DbSinkOptions<R, C> options) {
        Connection conn = options.getConnection();
        String tableName = options.getTableName();
        SQLPlatform platform = options.getPlatform().orElseThrow(() -> new IllegalStateException("No SQL platform specified in options"));
        Map<Object, String> columnMap1 = frame.cols().keys().collect(Collectors.toMap(c -> c, c -> options.getColumnNames().apply(c)));
        Map columnMap2 = Collect.reverse(columnMap1);
        try (Statement stmt = conn.createStatement();){
            String sql = "select * from \"" + tableName + "\" where 1=2";
            ArrayList<ColumnAdapter> columnList = new ArrayList<ColumnAdapter>();
            ResultSetMetaData metaData = stmt.executeQuery(sql).getMetaData();
            SQLType.TypeResolver typeResolver = SQLType.getTypeResolver((SQLPlatform)platform);
            for (int i = 0; i < metaData.getColumnCount(); ++i) {
                String sqlColName = metaData.getColumnName(i + 1);
                int sqlTypeCode = metaData.getColumnType(i + 1);
                String sqlTypeName = metaData.getColumnTypeName(i + 1);
                SQLType sqlType = typeResolver.getType(sqlTypeCode, sqlTypeName);
                if (options.getRowKeyColumn().map(name -> name.equals(sqlColName)).orElse(false).booleanValue()) {
                    columnList.add(new RowKeyAdapter<R, C>(sqlColName, sqlType, options));
                    continue;
                }
                if (!options.getAutoIncrementColumnName().map(name -> !name.equalsIgnoreCase(sqlColName)).orElse(true).booleanValue()) continue;
                Object colKey = columnMap2.get(sqlColName);
                Class dataType = frame.cols().type(colKey);
                DataFrameCursor cursor = frame.cursor().atColKey(colKey);
                Function1 mapper = options.getColumnMappings().getMapper(dataType);
                columnList.add(new ValueAdapter(sqlColName, sqlType, cursor, mapper));
            }
            ArrayList<ColumnAdapter> arrayList = columnList;
            return arrayList;
        }
        catch (Exception ex) {
            throw new DataFrameException("Failed to resolve SQL column types for table " + tableName, (Throwable)ex);
        }
    }

    private <R, C> String getCreateTableSql(DataFrame<R, C> frame, DbSinkOptions<R, C> options) {
        SQLPlatform platform = options.getPlatform().orElseThrow(() -> new IllegalStateException("No SQL platform configured on options"));
        StringBuilder ddl = new StringBuilder();
        ddl.append("CREATE TABLE ");
        ddl.append("\"");
        ddl.append(options.getTableName());
        ddl.append("\" (\n");
        options.getAutoIncrementColumnName().ifPresent(colName -> {
            ddl.append("    ");
            ddl.append("\"");
            ddl.append((String)colName);
            ddl.append("\" INTEGER");
            switch (platform) {
                case SQLITE: {
                    ddl.append(" PRIMARY KEY");
                    break;
                }
                case H2: {
                    ddl.append(" AUTO_INCREMENT PRIMARY KEY");
                    break;
                }
                case MYSQL: {
                    ddl.append(" AUTO_INCREMENT PRIMARY KEY");
                    break;
                }
                case HSQL: {
                    ddl.append(" IDENTITY PRIMARY KEY");
                    break;
                }
                case MSSQL: {
                    ddl.append(" IDENTITY(1,1) PRIMARY KEY");
                    break;
                }
                case GENERIC: {
                    ddl.append(" IDENTITY PRIMARY KEY");
                    break;
                }
                default: {
                    throw new IllegalStateException("Unsupported SQL dialect: " + platform);
                }
            }
            ddl.append(frame.cols().count() > 0 ? ",\n" : "");
        });
        options.getRowKeyColumn().ifPresent(colName -> {
            Class dataType = frame.rows().keyType();
            Class<?> sqlType = options.getColumnMappings().getSqlType(dataType);
            String typeInfo = this.getSqlTypeString(sqlType);
            ddl.append("    ");
            ddl.append("\"");
            ddl.append((String)colName);
            ddl.append("\" ").append(typeInfo);
            ddl.append(" NOT NULL");
            ddl.append(options.getAutoIncrementColumnName().isPresent() ? " PRIMARY KEY" : "");
            ddl.append(frame.cols().count() > 0 ? ",\n" : "");
        });
        frame.cols().forEach(column -> {
            Object key = column.key();
            String colName = options.getColumnNames().apply(key);
            boolean hasNull = column.hasNulls();
            Class dataType = frame.cols().type(key);
            Class<?> sqlType = options.getColumnMappings().getSqlType(dataType);
            String typeInfo = this.getSqlTypeString(sqlType);
            ddl.append("    ");
            ddl.append("\"");
            ddl.append(colName);
            ddl.append("\" ").append(typeInfo);
            ddl.append(hasNull ? " NULL" : " NOT NULL");
            ddl.append(",\n");
        });
        ddl.delete(ddl.length() - 2, ddl.length());
        ddl.append("\n)");
        return ddl.toString();
    }

    private String getSqlTypeString(Class<?> sqlClass) {
        SQLType sqlType = sqlTypeMap.get(sqlClass);
        if (sqlType == null) {
            throw new IllegalArgumentException("The SQL class is not a supported JDBC type: " + sqlClass);
        }
        switch (sqlType) {
            case BIT: {
                return "BIT";
            }
            case BOOLEAN: {
                return "BIT";
            }
            case TINYINT: {
                return "INTEGER";
            }
            case SMALLINT: {
                return "INTEGER";
            }
            case INTEGER: {
                return "INTEGER";
            }
            case BIGINT: {
                return "BIGINT";
            }
            case FLOAT: {
                return "DOUBLE";
            }
            case DOUBLE: {
                return "DOUBLE";
            }
            case DECIMAL: {
                return "DOUBLE";
            }
            case VARCHAR: {
                return "VARCHAR(255)";
            }
            case DATE: {
                return "DATE";
            }
            case TIME: {
                return "TIME";
            }
            case DATETIME: {
                return "DATETIME";
            }
        }
        throw new IllegalStateException("Unsupported SQL type:" + sqlType);
    }

    static {
        sqlTypeMap.put(Boolean.TYPE, SQLType.BIT);
        sqlTypeMap.put(Boolean.class, SQLType.BIT);
        sqlTypeMap.put(Integer.TYPE, SQLType.INTEGER);
        sqlTypeMap.put(Integer.class, SQLType.INTEGER);
        sqlTypeMap.put(Long.TYPE, SQLType.BIGINT);
        sqlTypeMap.put(Long.class, SQLType.BIGINT);
        sqlTypeMap.put(Float.TYPE, SQLType.DOUBLE);
        sqlTypeMap.put(Float.class, SQLType.DOUBLE);
        sqlTypeMap.put(Double.TYPE, SQLType.DOUBLE);
        sqlTypeMap.put(Double.class, SQLType.DOUBLE);
        sqlTypeMap.put(String.class, SQLType.VARCHAR);
        sqlTypeMap.put(Time.class, SQLType.TIME);
        sqlTypeMap.put(Date.class, SQLType.DATE);
        sqlTypeMap.put(Timestamp.class, SQLType.DATETIME);
    }

    private class ValueAdapter<R, C>
    extends ColumnAdapter<R, C> {
        private DataFrameCursor<R, C> cursor;
        private Function1<DataFrameValue<R, C>, ?> mapper;

        ValueAdapter(String colName, SQLType colType, DataFrameCursor<R, C> cursor, Function1<DataFrameValue<R, C>, ?> mapper) {
            super(colName, colType);
            this.cursor = cursor;
            this.mapper = mapper;
        }

        @Override
        void apply(PreparedStatement stmt, int stmtIndex, DataFrameRow<R, C> row) {
            try {
                this.cursor.atRow(row.ordinal());
                if (this.cursor.isNull()) {
                    stmt.setNull(stmtIndex, this.colType.getTypeCode());
                } else {
                    switch (this.colType) {
                        case BIT: {
                            stmt.setBoolean(stmtIndex, this.mapper.applyAsBoolean(this.cursor));
                            break;
                        }
                        case BOOLEAN: {
                            stmt.setBoolean(stmtIndex, this.mapper.applyAsBoolean(this.cursor));
                            break;
                        }
                        case TINYINT: {
                            stmt.setInt(stmtIndex, this.mapper.applyAsInt(this.cursor));
                            break;
                        }
                        case SMALLINT: {
                            stmt.setInt(stmtIndex, this.mapper.applyAsInt(this.cursor));
                            break;
                        }
                        case FLOAT: {
                            stmt.setDouble(stmtIndex, this.mapper.applyAsDouble(this.cursor));
                            break;
                        }
                        case INTEGER: {
                            stmt.setInt(stmtIndex, this.mapper.applyAsInt(this.cursor));
                            break;
                        }
                        case BIGINT: {
                            stmt.setLong(stmtIndex, this.mapper.applyAsLong(this.cursor));
                            break;
                        }
                        case DOUBLE: {
                            stmt.setDouble(stmtIndex, this.mapper.applyAsDouble(this.cursor));
                            break;
                        }
                        case DECIMAL: {
                            stmt.setDouble(stmtIndex, this.mapper.applyAsDouble(this.cursor));
                            break;
                        }
                        case VARCHAR: {
                            stmt.setString(stmtIndex, (String)this.mapper.apply(this.cursor));
                            break;
                        }
                        case DATE: {
                            stmt.setDate(stmtIndex, (Date)this.mapper.apply(this.cursor));
                            break;
                        }
                        case TIME: {
                            stmt.setTime(stmtIndex, (Time)this.mapper.apply(this.cursor));
                            break;
                        }
                        case DATETIME: {
                            stmt.setTimestamp(stmtIndex, (Timestamp)this.mapper.apply(this.cursor));
                            break;
                        }
                        default: {
                            throw new IllegalStateException("Unsupported column type:" + this.colType);
                        }
                    }
                }
            }
            catch (Exception ex) {
                String coordinates = String.format("(%s, %s)", this.cursor.rowKey(), this.cursor.colKey());
                throw new DataFrameException("Failed to apply value to SQL statement at " + coordinates, (Throwable)ex);
            }
        }
    }

    private class RowKeyAdapter<R, C>
    extends ColumnAdapter<R, C> {
        private SQLType rowKeyType;
        private Class<?> rowKeyClass;
        private Function1<R, ?> rowKeyMapper;

        RowKeyAdapter(String colName, SQLType colType, DbSinkOptions<R, C> options) {
            super(colName, colType);
            this.rowKeyMapper = options.getRowKeyMapper().orElseThrow(() -> new IllegalStateException("No mapper specified for row key mapping: " + colName));
            this.rowKeyClass = options.getRowKeySqlClass().orElseThrow(() -> new IllegalStateException("No SQL type specified for row key mapping: " + colName));
            this.rowKeyType = Optional.ofNullable((SQLType)sqlTypeMap.get(this.rowKeyClass)).orElseThrow(() -> new IllegalArgumentException("The specified type is not a supported JDBC type: " + this.rowKeyClass));
        }

        @Override
        void apply(PreparedStatement stmt, int stmtIndex, DataFrameRow<R, C> row) {
            Object rowKey = row.key();
            try {
                switch (this.rowKeyType) {
                    case BIT: {
                        stmt.setBoolean(stmtIndex, this.rowKeyMapper.applyAsBoolean(rowKey));
                        break;
                    }
                    case BOOLEAN: {
                        stmt.setBoolean(stmtIndex, this.rowKeyMapper.applyAsBoolean(rowKey));
                        break;
                    }
                    case TINYINT: {
                        stmt.setInt(stmtIndex, this.rowKeyMapper.applyAsInt(rowKey));
                        break;
                    }
                    case SMALLINT: {
                        stmt.setInt(stmtIndex, this.rowKeyMapper.applyAsInt(rowKey));
                        break;
                    }
                    case FLOAT: {
                        stmt.setDouble(stmtIndex, this.rowKeyMapper.applyAsDouble(rowKey));
                        break;
                    }
                    case INTEGER: {
                        stmt.setInt(stmtIndex, this.rowKeyMapper.applyAsInt(rowKey));
                        break;
                    }
                    case BIGINT: {
                        stmt.setLong(stmtIndex, this.rowKeyMapper.applyAsLong(rowKey));
                        break;
                    }
                    case DOUBLE: {
                        stmt.setDouble(stmtIndex, this.rowKeyMapper.applyAsDouble(rowKey));
                        break;
                    }
                    case DECIMAL: {
                        stmt.setDouble(stmtIndex, this.rowKeyMapper.applyAsDouble(rowKey));
                        break;
                    }
                    case VARCHAR: {
                        stmt.setString(stmtIndex, (String)this.rowKeyMapper.apply(rowKey));
                        break;
                    }
                    case DATE: {
                        stmt.setDate(stmtIndex, (Date)this.rowKeyMapper.apply(rowKey));
                        break;
                    }
                    case TIME: {
                        stmt.setTime(stmtIndex, (Time)this.rowKeyMapper.apply(rowKey));
                        break;
                    }
                    case DATETIME: {
                        stmt.setTimestamp(stmtIndex, (Timestamp)this.rowKeyMapper.apply(rowKey));
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Unsupported column type:" + this.rowKeyType);
                    }
                }
            }
            catch (Exception ex) {
                throw new DataFrameException("Failed to apply row key to SQL statement at " + rowKey, (Throwable)ex);
            }
        }
    }

    private abstract class ColumnAdapter<R, C> {
        String colName;
        SQLType colType;

        ColumnAdapter(String colName, SQLType colType) {
            this.colName = colName;
            this.colType = colType;
        }

        public String toString() {
            return String.format("ColumnAdapter{type=%s, colName=%s}", this.colType, this.colName);
        }

        abstract void apply(PreparedStatement var1, int var2, DataFrameRow<R, C> var3);
    }
}

