/*
 * Decompiled with CFR 0.152.
 */
package stream.io;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.Keys;
import stream.ProcessContext;
import stream.Processor;
import stream.ProcessorException;
import stream.annotations.Description;
import stream.annotations.Parameter;
import stream.data.DataFactory;
import stream.io.AbstractSQLProcessor;
import stream.io.sql.HsqlDialect;
import stream.io.sql.MysqlDialect;

@Description(group="Data Stream.Output")
public class SQLWriter
extends AbstractSQLProcessor {
    static Logger log = LoggerFactory.getLogger(SQLWriter.class);
    boolean dropTable = false;
    String table;
    Keys keys = new Keys("*");
    final LinkedHashSet<String> keysToStore = new LinkedHashSet();
    Map<String, Class<?>> tableSchema = null;
    transient boolean tableExists = false;
    transient long count = 0L;
    transient Connection connection = null;
    transient List<String> columns = new ArrayList<String>();

    public String getTable() {
        return this.table;
    }

    @Parameter(required=true, description="The database table to insert items into.")
    public void setTable(String table) {
        this.table = table;
    }

    @Parameter(required=false, description="A list of attributes to insert (columns), empty string for all attributes.")
    public void setKeys(Keys keys) {
        this.keys = keys;
    }

    public boolean isDropTable() {
        return this.dropTable;
    }

    @Parameter(required=false, defaultValue="false")
    public void setDropTable(boolean dropTable) {
        this.dropTable = dropTable;
    }

    public void init(ProcessContext ctx) throws Exception {
        super.init(ctx);
        if (this.table == null || this.table.trim().equals("")) {
            throw new Exception("No 'table' attribute provided!");
        }
        this.init();
        log.debug("init(ProcessContext) done.");
    }

    private void init() throws Exception {
        this.connection = this.openConnection();
        log.debug("Opened connection to {} = {}", (Object)this.getUrl(), (Object)this.connection);
        log.debug("Dialect = {} ", (Object)this.dialect);
        if (this.url.toLowerCase().startsWith("jdbc:mysql")) {
            this.dialect = new MysqlDialect();
        }
        if (this.url.toLowerCase().startsWith("jdbc:hsqldb")) {
            this.dialect = new HsqlDialect();
        }
        log.debug("Using dialect {}", (Object)this.dialect);
        if (this.dropTable) {
            log.debug("Dropping existing table '{}'", (Object)this.getTable());
            try {
                Statement stmt = this.connection.createStatement();
                int ret = stmt.executeUpdate("DROP TABLE " + this.getTable());
                log.debug("Return of DROP TABLE: {}", (Object)ret);
            }
            catch (Exception e) {
                log.error("Failed to drop table: {}", (Object)e.getMessage());
            }
        } else {
            Map<String, Class<?>> schema = this.dialect.getTableSchema(this.connection, this.getTable());
            if (schema != null) {
                log.debug("Using existing table schema: {}", schema);
                log.info("Existing schema is: {}", schema);
                if (this.tableSchema == null) {
                    this.tableSchema = new LinkedHashMap(schema);
                } else {
                    this.tableSchema.putAll(schema);
                }
                if (this.keys != null) {
                    for (String key : this.keys.getKeyValues()) {
                        if (this.tableSchema.containsKey(key)) continue;
                        log.info("Removing non-selected key '{}'", (Object)key);
                        this.tableSchema.remove(key);
                    }
                }
                log.debug("Types:\n{}", this.tableSchema);
            }
        }
    }

    @Override
    public boolean hasTable(String name) {
        if (this.tableExists) {
            return true;
        }
        this.tableExists = super.hasTable(name);
        return this.tableExists;
    }

    public Data process(Data input) {
        if (this.connection == null) {
            try {
                this.init();
            }
            catch (Exception e) {
                throw new ProcessorException((Processor)this, "Failed to initialize database connection: " + e.getMessage());
            }
        }
        if (this.tableSchema == null) {
            log.debug("No table-schema found, does table exist? {}", (Object)this.hasTable(this.getTable()));
            this.tableSchema = this.dialect.getTableSchema(this.connection, this.getTable());
            log.debug("Tried to read schema from database: {}", this.tableSchema);
            if (this.tableSchema == null) {
                log.debug("Creating new table {} from first item {}", (Object)this.getTable(), (Object)input);
                Data sample = DataFactory.create();
                Set ks = this.keys.select(input);
                for (String k : ks) {
                    sample.put((Object)k, input.get((Object)k));
                }
                Map<String, Class<?>> schema = this.dialect.getColumnTypes(sample);
                if (this.createTable(this.getTable(), schema)) {
                    this.tableSchema = schema;
                } else {
                    throw new ProcessorException((Processor)this, "Failed to create table " + this.getTable() + " for item: " + input);
                }
            }
        }
        if (!this.hasTable(this.getTable())) {
            if (this.keys != null) {
                for (String key : this.keys.select(input)) {
                    Serializable value = (Serializable)input.get((Object)key);
                    if (value == null) {
                        throw new ProcessorException((Processor)this, "Cannot determine type of key '" + key + "' for table creation! First item does not provide a value for '" + key + "'!");
                    }
                    this.tableSchema.put(key, value.getClass());
                }
            } else {
                for (String key : input.keySet()) {
                    this.tableSchema.put(key, ((Serializable)input.get((Object)key)).getClass());
                }
            }
            if (!this.createTable(this.getTable(), this.tableSchema)) {
                throw new ProcessorException((Processor)this, "Failed to create table '" + this.getTable() + "'!");
            }
            this.tableExists = true;
        }
        try {
            StringBuffer insert = new StringBuffer("INSERT INTO ");
            insert.append(this.getTable());
            insert.append(" ( ");
            StringBuffer values = new StringBuffer(" VALUES ( ");
            ArrayList<Serializable> valueObject = new ArrayList<Serializable>();
            Iterator<String> it = this.tableSchema.keySet().iterator();
            while (it.hasNext()) {
                String key = it.next();
                Serializable value = (Serializable)input.get((Object)key);
                if (value == null) continue;
                valueObject.add(value);
                insert.append(this.dialect.mapColumnName(key));
                values.append("?");
                if (!it.hasNext()) continue;
                insert.append(", ");
                values.append(", ");
            }
            insert.append(" ) ");
            values.append(" ) ");
            insert.append(values.toString());
            log.debug("INSERT statement is: {}", (Object)insert);
            PreparedStatement ps = this.connection.prepareStatement(insert.toString());
            for (int i = 0; i < valueObject.size(); ++i) {
                ps.setObject(i + 1, valueObject.get(i));
            }
            int ret = ps.executeUpdate();
            if (ret == 1) {
                ++this.count;
            }
            log.debug("INSERT retured {}", (Object)ret);
            ps.close();
        }
        catch (Exception e) {
            log.error("Failed to insert data item: {}", (Object)e.getMessage());
        }
        return input;
    }

    public void finish() throws Exception {
        super.finish();
        log.debug("Closing SQL writer, {} items written.", (Object)this.count);
        log.debug("Closing SQL connection...");
        this.connection.close();
        this.tableSchema = null;
    }
}

