/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.join;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.join.JoinStore;
import com.datatorrent.lib.join.TimeEvent;
import com.datatorrent.lib.join.TimeEventImpl;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import javax.validation.constraints.NotNull;
import org.apache.hadoop.classification.InterfaceStability;

@InterfaceStability.Unstable
public abstract class AbstractJoinOperator<T>
extends BaseOperator
implements Operator.CheckpointListener {
    @AutoMetric
    private long tuplesJoinedPerSec;
    private double windowTimeSec;
    protected int tuplesCount;
    public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort();
    protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;
    protected boolean isLeft;
    @InputPortFieldAnnotation
    public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>(){

        public void process(T tuple) {
            AbstractJoinOperator.this.isLeft = true;
            AbstractJoinOperator.this.processTuple(tuple);
        }
    };
    @InputPortFieldAnnotation
    public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>(){

        public void process(T tuple) {
            AbstractJoinOperator.this.isLeft = false;
            AbstractJoinOperator.this.processTuple(tuple);
        }
    };
    @NotNull
    protected StoreContext leftStore;
    @NotNull
    protected StoreContext rightStore;
    private String includeFieldStr;
    private String keyFieldStr;
    private String timeFieldStr;

    public void setup(Context.OperatorContext context) {
        boolean isOuter = this.strategy.equals((Object)JoinStrategy.LEFT_OUTER_JOIN) || this.strategy.equals((Object)JoinStrategy.OUTER_JOIN);
        this.leftStore.getStore().isOuterJoin(isOuter);
        isOuter = this.strategy.equals((Object)JoinStrategy.RIGHT_OUTER_JOIN) || this.strategy.equals((Object)JoinStrategy.OUTER_JOIN);
        this.rightStore.getStore().isOuterJoin(isOuter);
        this.leftStore.getStore().setup((Context)context);
        this.rightStore.getStore().setup((Context)context);
        this.populateFields();
        this.windowTimeSec = (double)((Integer)context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * (Integer)context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)) * 1.0 / 1000.0;
    }

    protected void processTuple(T tuple) {
        TimeEvent t;
        JoinStore store = this.isLeft ? this.leftStore.getStore() : this.rightStore.getStore();
        if (store.put(t = this.createEvent(tuple))) {
            this.join(t, this.isLeft);
        }
    }

    private void populateFields() {
        this.populateIncludeFields();
        this.populateKeyFields();
        if (this.timeFieldStr != null) {
            this.populateTimeFields();
        }
    }

    private void populateIncludeFields() {
        String[] portFields = this.includeFieldStr.split(";");
        assert (portFields.length == 2);
        this.leftStore.setIncludeFields(portFields[0].split(","));
        this.rightStore.setIncludeFields(portFields[1].split(","));
    }

    private void join(TimeEvent tuple, boolean isLeft) {
        JoinStore store = isLeft ? this.rightStore.getStore() : this.leftStore.getStore();
        ArrayList value = tuple != null ? (ArrayList)store.getValidTuples(tuple) : (ArrayList)store.getUnMatchedTuples();
        if (value != null) {
            ArrayList<T> result = new ArrayList<T>();
            for (TimeEvent joinedValue : value) {
                T output = this.createOutputTuple();
                Object tupleValue = null;
                if (tuple != null) {
                    tupleValue = tuple.getValue();
                }
                this.copyValue(output, tupleValue, isLeft);
                this.copyValue(output, joinedValue.getValue(), !isLeft);
                result.add(output);
                joinedValue.setMatch(true);
            }
            if (tuple != null) {
                tuple.setMatch(true);
            }
            if (result.size() != 0) {
                this.outputPort.emit(result);
                this.tuplesCount += result.size();
            }
        }
    }

    public void endWindow() {
        if (this.strategy.equals((Object)JoinStrategy.LEFT_OUTER_JOIN) || this.strategy.equals((Object)JoinStrategy.OUTER_JOIN)) {
            this.join(null, false);
        }
        if (this.strategy.equals((Object)JoinStrategy.RIGHT_OUTER_JOIN) || this.strategy.equals((Object)JoinStrategy.OUTER_JOIN)) {
            this.join(null, true);
        }
        this.leftStore.getStore().endWindow();
        this.rightStore.getStore().endWindow();
        this.tuplesJoinedPerSec = (long)((double)this.tuplesCount / this.windowTimeSec);
    }

    public void beginWindow(long windowId) {
        super.beginWindow(windowId);
        this.tuplesJoinedPerSec = 0L;
        this.tuplesCount = 0;
    }

    public void checkpointed(long windowId) {
        this.leftStore.getStore().checkpointed(windowId);
        this.rightStore.getStore().checkpointed(windowId);
    }

    public void committed(long windowId) {
        this.leftStore.getStore().committed(windowId);
        this.rightStore.getStore().committed(windowId);
    }

    protected TimeEvent createEvent(Object tuple) {
        String key = this.leftStore.getKeys();
        String timeField = this.leftStore.getTimeFields();
        if (!this.isLeft) {
            key = this.rightStore.getKeys();
            timeField = this.rightStore.getTimeFields();
        }
        if (timeField != null) {
            return new TimeEventImpl(this.getKeyValue(key, tuple), (Long)this.getTime(timeField, tuple), tuple);
        }
        return new TimeEventImpl(this.getKeyValue(key, tuple), Calendar.getInstance().getTimeInMillis(), tuple);
    }

    private void populateKeyFields() {
        this.leftStore.setKeys(this.keyFieldStr.split(",")[0]);
        this.rightStore.setKeys(this.keyFieldStr.split(",")[1]);
    }

    public JoinStrategy getStrategy() {
        return this.strategy;
    }

    public void setStrategy(JoinStrategy strategy) {
        this.strategy = strategy;
    }

    public void setLeftStore(@NotNull JoinStore lStore) {
        this.leftStore = new StoreContext(lStore);
    }

    public void setRightStore(@NotNull JoinStore rStore) {
        this.rightStore = new StoreContext(rStore);
    }

    public void setKeyFields(String keyFieldStr) {
        this.keyFieldStr = keyFieldStr;
    }

    public void setTimeFieldStr(String timeFieldStr) {
        this.timeFieldStr = timeFieldStr;
    }

    public void setIncludeFields(String includeFieldStr) {
        this.includeFieldStr = includeFieldStr;
    }

    public StoreContext getLeftStore() {
        return this.leftStore;
    }

    public StoreContext getRightStore() {
        return this.rightStore;
    }

    public String getIncludeFieldStr() {
        return this.includeFieldStr;
    }

    public String getKeyFieldStr() {
        return this.keyFieldStr;
    }

    public String getTimeFieldStr() {
        return this.timeFieldStr;
    }

    private void populateTimeFields() {
        this.leftStore.setTimeFields(this.timeFieldStr.split(",")[0]);
        this.rightStore.setTimeFields(this.timeFieldStr.split(",")[1]);
    }

    public void setStrategy(String policy) {
        this.strategy = JoinStrategy.valueOf(policy.toUpperCase());
    }

    protected abstract T createOutputTuple();

    protected abstract void copyValue(T var1, Object var2, boolean var3);

    protected abstract Object getKeyValue(String var1, Object var2);

    protected abstract Object getTime(String var1, Object var2);

    public static class StoreContext {
        private transient String timeFields;
        private transient String[] includeFields;
        private transient String keys;
        private JoinStore store;

        public StoreContext(JoinStore store) {
            this.store = store;
        }

        public String getTimeFields() {
            return this.timeFields;
        }

        public void setTimeFields(String timeFields) {
            this.timeFields = timeFields;
        }

        public String[] getIncludeFields() {
            return this.includeFields;
        }

        public void setIncludeFields(String[] includeFields) {
            this.includeFields = includeFields;
        }

        public String getKeys() {
            return this.keys;
        }

        public void setKeys(String keys) {
            this.keys = keys;
        }

        public JoinStore getStore() {
            return this.store;
        }

        public void setStore(JoinStore store) {
            this.store = store;
        }
    }

    public static enum JoinStrategy {
        INNER_JOIN,
        LEFT_OUTER_JOIN,
        RIGHT_OUTER_JOIN,
        OUTER_JOIN;

    }
}

