/*
 * Decompiled with CFR 0.152.
 */
package azkaban.executor;

import azkaban.db.DatabaseOperator;
import azkaban.db.EncodingType;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionReference;
import azkaban.executor.Executor;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.utils.GZIPUtils;
import azkaban.utils.Pair;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.log4j.Logger;

@Singleton
public class FetchActiveFlowDao {
    private static final Logger logger = Logger.getLogger(FetchActiveFlowDao.class);
    private final DatabaseOperator dbOperator;

    @Inject
    public FetchActiveFlowDao(DatabaseOperator dbOperator) {
        this.dbOperator = dbOperator;
    }

    Map<Integer, Pair<ExecutionReference, ExecutableFlow>> fetchActiveFlows() throws ExecutorManagerException {
        try {
            return (Map)this.dbOperator.query(FetchActiveExecutableFlows.FETCH_ACTIVE_EXECUTABLE_FLOW, (ResultSetHandler)new FetchActiveExecutableFlows(), new Object[0]);
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    @VisibleForTesting
    static class FetchActiveExecutableFlows
    implements ResultSetHandler<Map<Integer, Pair<ExecutionReference, ExecutableFlow>>> {
        private static final String FETCH_ACTIVE_EXECUTABLE_FLOW = "SELECT ex.exec_id exec_id, ex.enc_type enc_type, ex.flow_data flow_data, et.host host, et.port port, ex.executor_id executorId, et.active executorStatus FROM execution_flows ex LEFT JOIN  executors et ON ex.executor_id = et.id Where ex.status NOT IN (" + Status.SUCCEEDED.getNumVal() + ", " + Status.KILLED.getNumVal() + ", " + Status.FAILED.getNumVal() + ")";

        FetchActiveExecutableFlows() {
        }

        public Map<Integer, Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs) throws SQLException {
            if (!rs.next()) {
                return Collections.emptyMap();
            }
            HashMap<Integer, Pair<ExecutionReference, ExecutableFlow>> execFlows = new HashMap<Integer, Pair<ExecutionReference, ExecutableFlow>>();
            do {
                int id = rs.getInt(1);
                int encodingType = rs.getInt(2);
                byte[] data = rs.getBytes(3);
                String host = rs.getString(4);
                int port = rs.getInt(5);
                int executorId = rs.getInt(6);
                boolean executorStatus = rs.getBoolean(7);
                if (data == null) {
                    logger.warn((Object)("Execution id " + id + " has flow_data=null. To clean up, update status to FAILED manually, eg. SET status = " + Status.FAILED.getNumVal() + " WHERE id = " + id));
                    continue;
                }
                EncodingType encType = EncodingType.fromInteger((int)encodingType);
                try {
                    Executor executor;
                    ExecutableFlow exFlow = ExecutableFlow.createExecutableFlowFromObject(GZIPUtils.transformBytesToObject(data, encType));
                    if (host == null) {
                        logger.warn((Object)("Executor id " + executorId + " (on execution " + id + ") wasn't found"));
                        executor = null;
                    } else {
                        executor = new Executor(executorId, host, port, executorStatus);
                    }
                    ExecutionReference ref = new ExecutionReference(id, executor);
                    execFlows.put(id, new Pair<ExecutionReference, ExecutableFlow>(ref, exFlow));
                }
                catch (IOException e) {
                    throw new SQLException("Error retrieving flow data " + id, e);
                }
            } while (rs.next());
            return execFlows;
        }
    }
}

