/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.db.jdbc;

import com.datatorrent.api.Context;
import com.datatorrent.lib.appdata.gpo.GPOMutable;
import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema;
import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
import com.datatorrent.lib.appdata.schemas.Type;
import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.dimensions.DimensionsEvent;
import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public class JDBCDimensionalOutputOperator
extends AbstractPassThruTransactionableStoreOutputOperator<DimensionsEvent.Aggregate, JdbcTransactionalStore> {
    protected static int DEFAULT_BATCH_SIZE = 1000;
    @Min(value=1L)
    private int batchSize;
    private final List<DimensionsEvent.Aggregate> tuples;
    private transient int batchStartIdx = 0;
    @NotNull
    private Map<Integer, Map<String, String>> tableNames;
    @NotNull
    private String eventSchema;
    @NotNull
    private AggregatorRegistry aggregatorRegistry = AggregatorRegistry.DEFAULT_AGGREGATOR_REGISTRY;
    private DimensionalConfigurationSchema schema;
    private transient Map<Integer, Map<Integer, PreparedStatement>> ddIDToAggIDToStatement = Maps.newHashMap();
    private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcTransactionableOutputOperator.class);

    public JDBCDimensionalOutputOperator() {
        this.tuples = Lists.newArrayList();
        this.batchSize = DEFAULT_BATCH_SIZE;
        this.store = new JdbcTransactionalStore();
    }

    @Override
    public void setup(Context.OperatorContext context) {
        super.setup(context);
        LOG.info("Done setting up super");
        this.aggregatorRegistry.setup();
        this.schema = new DimensionalConfigurationSchema(this.eventSchema, this.aggregatorRegistry);
        List<FieldsDescriptor> keyFDs = this.schema.getDimensionsDescriptorIDToKeyDescriptor();
        for (int ddID = 0; ddID < keyFDs.size(); ++ddID) {
            LOG.info("ddID {}", (Object)ddID);
            FieldsDescriptor keyFD = keyFDs.get(ddID);
            Int2ObjectMap<FieldsDescriptor> aggIDToAggFD = this.schema.getDimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor().get(ddID);
            HashMap aggIDToStatement = this.ddIDToAggIDToStatement.get(ddID);
            if (aggIDToStatement == null) {
                aggIDToStatement = Maps.newHashMap();
                this.ddIDToAggIDToStatement.put(ddID, aggIDToStatement);
            }
            for (Map.Entry<String, String> aggTable : this.tableNames.get(ddID).entrySet()) {
                int aggID = this.aggregatorRegistry.getIncrementalAggregatorNameToID().get(aggTable.getKey());
                LOG.info("aggID {}", (Object)aggID);
                FieldsDescriptor aggFD = (FieldsDescriptor)aggIDToAggFD.get(aggID);
                List<String> keyNames = keyFD.getFieldList();
                keyNames.remove("timeBucket");
                LOG.info("List fields {}", keyNames);
                List<String> aggregateNames = aggFD.getFieldList();
                LOG.info("List fields {}", aggregateNames);
                String tableName = aggTable.getValue();
                String statementString = this.buildStatement(tableName, keyNames, aggregateNames);
                try {
                    aggIDToStatement.put(aggID, ((JdbcTransactionalStore)this.store).getConnection().prepareStatement(statementString));
                }
                catch (SQLException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }
    }

    private String buildStatement(String tableName, List<String> keyNames, List<String> aggregateNames) {
        LOG.info("building statement");
        StringBuilder sb = new StringBuilder();
        sb.append("INSERT INTO ");
        sb.append(tableName);
        sb.append(" (");
        this.addList(sb, keyNames);
        sb.append(",");
        this.addList(sb, aggregateNames);
        sb.append(") VALUES (");
        int qCounter = 0;
        while (true) {
            sb.append("?");
            if (qCounter == keyNames.size() + aggregateNames.size() - 1) break;
            sb.append(",");
            ++qCounter;
        }
        sb.append(") ON DUPLICATE KEY UPDATE ");
        this.addOnDuplicate(sb, aggregateNames);
        return sb.toString();
    }

    private void addOnDuplicate(StringBuilder sb, List<String> names) {
        LOG.info("add Duplicate");
        int index = 0;
        while (true) {
            String name = names.get(index);
            sb.append(name);
            sb.append("=");
            sb.append("VALUES(");
            sb.append(name);
            sb.append(")");
            if (index == names.size() - 1) break;
            sb.append(",");
            ++index;
        }
    }

    private void addList(StringBuilder sb, List<String> names) {
        int index = 0;
        while (true) {
            sb.append(names.get(index));
            if (index == names.size() - 1) break;
            sb.append(",");
            ++index;
        }
    }

    public void setTableNames(Map<Integer, Map<String, String>> tableNames) {
        this.tableNames = (Map)Preconditions.checkNotNull(tableNames);
    }

    public void setEventSchema(String eventSchema) {
        this.eventSchema = eventSchema;
    }

    public void setAggregatorRegistry(AggregatorRegistry aggregatorRegistry) {
        this.aggregatorRegistry = aggregatorRegistry;
    }

    @Override
    public void endWindow() {
        if (this.tuples.size() - this.batchStartIdx > 0) {
            this.processBatch();
        }
        super.endWindow();
        this.tuples.clear();
        this.batchStartIdx = 0;
    }

    @Override
    public void processTuple(DimensionsEvent.Aggregate tuple) {
        this.tuples.add(tuple);
        if (this.tuples.size() - this.batchStartIdx >= this.batchSize) {
            this.processBatch();
        }
    }

    private void processBatch() {
        LOG.info("start {} end {}", (Object)this.batchStartIdx, (Object)this.tuples.size());
        try {
            for (int i = this.batchStartIdx; i < this.tuples.size(); ++i) {
                this.setStatementParameters(this.tuples.get(i));
            }
            for (Map.Entry<Integer, Map<Integer, PreparedStatement>> ddIDToAggIDToStatementEntry : this.ddIDToAggIDToStatement.entrySet()) {
                for (Map.Entry<Integer, PreparedStatement> entry : ddIDToAggIDToStatementEntry.getValue().entrySet()) {
                    entry.getValue().executeBatch();
                    entry.getValue().clearBatch();
                }
            }
        }
        catch (SQLException e) {
            throw new RuntimeException("processing batch", e);
        }
        finally {
            this.batchStartIdx += this.tuples.size() - this.batchStartIdx;
        }
    }

    private void setStatementParameters(DimensionsEvent.Aggregate aggregate) {
        DimensionsEvent.EventKey eventKey = aggregate.getEventKey();
        int ddID = eventKey.getDimensionDescriptorID();
        int aggID = eventKey.getAggregatorID();
        LOG.info("Setting statement params {} {}", (Object)ddID, (Object)aggID);
        FieldsDescriptor keyFD = this.schema.getDimensionsDescriptorIDToKeyDescriptor().get(ddID);
        FieldsDescriptor aggFD = (FieldsDescriptor)this.schema.getDimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor().get(ddID).get(aggID);
        GPOMutable key = eventKey.getKey();
        key.setFieldDescriptor(keyFD);
        GPOMutable value = aggregate.getAggregates();
        value.setFieldDescriptor(aggFD);
        int qCounter = 1;
        PreparedStatement ps = this.ddIDToAggIDToStatement.get(ddID).get(aggID);
        try {
            qCounter = this.setParams(ps, key, qCounter, true);
            this.setParams(ps, value, qCounter, false);
            ps.addBatch();
        }
        catch (SQLException ex) {
            throw new RuntimeException(ex);
        }
    }

    private int setParams(PreparedStatement ps, GPOMutable gpo, int qCounter, boolean isKey) throws SQLException {
        FieldsDescriptor fd = gpo.getFieldDescriptor();
        Map<String, Type> fieldToType = fd.getFieldToType();
        List<String> fields = fd.getFieldList();
        int fieldCounter = 0;
        while (fieldCounter < fields.size()) {
            String fieldName = fields.get(fieldCounter);
            if (fieldName.equals("timeBucket")) {
                --qCounter;
            } else {
                Type type = fieldToType.get(fieldName);
                LOG.info("Field Name {} {}", (Object)fieldName, (Object)qCounter);
                switch (type) {
                    case BOOLEAN: {
                        ps.setByte(qCounter, (byte)(gpo.getFieldBool(fieldName) ? 1 : 0));
                        break;
                    }
                    case BYTE: {
                        ps.setByte(qCounter, gpo.getFieldByte(fieldName));
                        break;
                    }
                    case CHAR: {
                        ps.setString(qCounter, Character.toString(gpo.getFieldChar(fieldName)));
                        break;
                    }
                    case STRING: {
                        ps.setString(qCounter, gpo.getFieldString(fieldName));
                        break;
                    }
                    case SHORT: {
                        ps.setInt(qCounter, gpo.getFieldShort(fieldName));
                        break;
                    }
                    case INTEGER: {
                        ps.setInt(qCounter, gpo.getFieldInt(fieldName));
                        break;
                    }
                    case LONG: {
                        ps.setLong(qCounter, gpo.getFieldLong(fieldName));
                        break;
                    }
                    case FLOAT: {
                        ps.setFloat(qCounter, gpo.getFieldFloat(fieldName));
                        break;
                    }
                    case DOUBLE: {
                        ps.setDouble(qCounter, gpo.getFieldDouble(fieldName));
                        break;
                    }
                    default: {
                        throw new UnsupportedOperationException("The type: " + type + " is not supported.");
                    }
                }
            }
            ++fieldCounter;
            ++qCounter;
        }
        return qCounter;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }
}

