/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.util.KeyValPair;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public abstract class AbstractKeyValueStoreOutputOperator<K, V>
extends BaseOperator {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractKeyValueStoreOutputOperator.class);
    protected long currentWindowId;
    protected transient long committedWindowId = 0L;
    private transient int operatorId;
    private transient String appId;
    protected Map<K, Object> dataMap = new HashMap<K, Object>();
    protected int continueOnError = 0;
    @InputPortFieldAnnotation(optional=true)
    public final transient DefaultInputPort<Map<K, V>> input = new DefaultInputPort<Map<K, V>>(){

        public void process(Map<K, V> t) {
            if (AbstractKeyValueStoreOutputOperator.this.committedWindowId < AbstractKeyValueStoreOutputOperator.this.currentWindowId) {
                AbstractKeyValueStoreOutputOperator.this.process(t);
            }
        }
    };
    @InputPortFieldAnnotation(optional=true)
    public final transient DefaultInputPort<KeyValPair<K, V>> inputInd = new DefaultInputPort<KeyValPair<K, V>>(){

        public void process(KeyValPair<K, V> t) {
            if (AbstractKeyValueStoreOutputOperator.this.committedWindowId < AbstractKeyValueStoreOutputOperator.this.currentWindowId) {
                AbstractKeyValueStoreOutputOperator.this.process(t.getKey(), t.getValue());
            }
        }
    };

    public void setContinueOnError(int continueOnError) {
        this.continueOnError = continueOnError;
    }

    public abstract String get(String var1);

    public abstract void put(String var1, String var2);

    public abstract void store(Map<K, Object> var1);

    public abstract void startTransaction();

    public abstract void commitTransaction();

    public abstract void rollbackTransaction();

    public void process(Map<K, V> t) {
        this.dataMap.putAll(t);
    }

    public void process(K key, V value) {
        this.dataMap.put(key, value);
    }

    public void setup(Context.OperatorContext ctxt) {
        this.operatorId = ctxt.getId();
        this.appId = (String)ctxt.getValue(Context.DAGContext.APPLICATION_ID);
        String v = this.get(this.getEndWindowKey());
        if (v != null) {
            this.committedWindowId = Long.valueOf(v);
        }
    }

    public void beginWindow(long windowId) {
        this.currentWindowId = windowId;
        this.dataMap.clear();
    }

    public void endWindow() {
        block6: {
            try {
                if (this.committedWindowId < this.currentWindowId) {
                    this.startTransaction();
                    this.store(this.dataMap);
                    this.put(this.getEndWindowKey(), String.valueOf(this.currentWindowId));
                    this.commitTransaction();
                    this.committedWindowId = this.currentWindowId;
                } else {
                    LOG.info("Discarding data for window id {} because committed window is {}", (Object)this.currentWindowId, (Object)this.committedWindowId);
                }
            }
            catch (RuntimeException se) {
                this.logException("Error saving data", se);
                try {
                    this.rollbackTransaction();
                }
                catch (RuntimeException re) {
                    this.logException("Error rolling back", re);
                }
                if (this.continueOnError != 0) break block6;
                throw se;
            }
        }
    }

    private String getEndWindowKey() {
        return "_ew:" + this.appId + ":" + this.operatorId;
    }

    private void logException(String message, Exception exception) {
        if (this.continueOnError != 0) {
            LOG.warn(message, (Throwable)exception);
        } else {
            LOG.error(message, (Throwable)exception);
        }
    }
}

