/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.db;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.db.TransactionableStore;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceStability;

@InterfaceStability.Evolving
public abstract class AbstractTransactionableStoreOutputOperator<T, S extends TransactionableStore>
extends BaseOperator {
    protected S store;
    protected String appId;
    protected Integer operatorId;
    protected long currentWindowId = -1L;
    protected long committedWindowId = -1L;
    @InputPortFieldAnnotation(optional=true)
    public final transient DefaultInputPort<T> input = new DefaultInputPort<T>(){

        public void process(T t) {
            if (AbstractTransactionableStoreOutputOperator.this.committedWindowId < AbstractTransactionableStoreOutputOperator.this.currentWindowId) {
                AbstractTransactionableStoreOutputOperator.this.processTuple(t);
            }
        }
    };

    public S getStore() {
        return this.store;
    }

    public void setStore(S store) {
        this.store = store;
    }

    public void setup(Context.OperatorContext context) {
        try {
            this.store.connect();
            this.appId = (String)context.getValue(DAG.APPLICATION_ID);
            this.operatorId = context.getId();
            this.committedWindowId = this.store.getCommittedWindowId(this.appId, this.operatorId);
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    }

    public void beginWindow(long windowId) {
        this.currentWindowId = windowId;
    }

    public void teardown() {
        try {
            if (this.store.isInTransaction()) {
                this.store.rollbackTransaction();
            }
            this.store.disconnect();
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    }

    public abstract void processTuple(T var1);
}

