/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.db.jdbc;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.db.jdbc.AbstractJdbcNonTransactionableOutputOperator;
import com.datatorrent.lib.db.jdbc.JdbcNonTransactionalStore;
import com.google.common.collect.Lists;
import java.sql.SQLException;
import java.util.List;
import javax.validation.constraints.Min;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJdbcNonTransactionableBatchOutputOperator<T, S extends JdbcNonTransactionalStore>
extends AbstractJdbcNonTransactionableOutputOperator<T, S> {
    private static final transient Logger LOG = LoggerFactory.getLogger(AbstractJdbcNonTransactionableBatchOutputOperator.class);
    public static final int DEFAULT_BATCH_SIZE = 1000;
    @Min(value=1L)
    private int batchSize = 1000;
    private final List<T> tuples = Lists.newArrayList();
    private Operator.ProcessingMode mode;
    private long currentWindowId;
    private transient long committedWindowId;
    private transient String appId;
    private transient int operatorId;

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setMode(Operator.ProcessingMode mode) {
        this.mode = mode;
    }

    public Operator.ProcessingMode getMode() {
        return this.mode;
    }

    public String getAppId() {
        return this.appId;
    }

    public int getOperatorId() {
        return this.operatorId;
    }

    @Override
    public void setup(Context.OperatorContext context) {
        super.setup(context);
        this.mode = (Operator.ProcessingMode)context.getValue(Context.OperatorContext.PROCESSING_MODE);
        if (this.mode == Operator.ProcessingMode.AT_MOST_ONCE) {
            this.tuples.clear();
        }
        try {
            for (T tempTuple : this.tuples) {
                this.setStatementParameters(this.updateCommand, tempTuple);
                this.updateCommand.addBatch();
            }
        }
        catch (SQLException e) {
            throw new RuntimeException(e);
        }
        this.appId = (String)context.getValue(DAG.APPLICATION_ID);
        this.operatorId = context.getId();
        this.committedWindowId = ((JdbcNonTransactionalStore)this.store).getCommittedWindowId(this.appId, this.operatorId);
        LOG.debug("AppId {} OperatorId {}", (Object)this.appId, (Object)this.operatorId);
        LOG.debug("Committed window id {}", (Object)this.committedWindowId);
    }

    @Override
    public void beginWindow(long windowId) {
        super.beginWindow(windowId);
        this.currentWindowId = windowId;
        LOG.debug("Committed window {}, current window {}", (Object)this.committedWindowId, (Object)this.currentWindowId);
    }

    public void endWindow() {
        super.endWindow();
        if (this.committedWindowId < this.currentWindowId) {
            ((JdbcNonTransactionalStore)this.store).storeCommittedWindowId(this.appId, this.operatorId, this.currentWindowId);
            this.committedWindowId = this.currentWindowId;
        }
    }

    @Override
    public void processTuple(T tuple) {
        if (this.committedWindowId >= this.currentWindowId) {
            return;
        }
        this.tuples.add(tuple);
        try {
            this.setStatementParameters(this.updateCommand, tuple);
            this.updateCommand.addBatch();
            if (this.tuples.size() >= this.batchSize) {
                this.tuples.clear();
                this.updateCommand.executeBatch();
                this.updateCommand.clearBatch();
            }
        }
        catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
}

