/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap;

import com.google.protobuf.ByteString;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapArrowBatchRecordReader;
import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
import org.apache.hadoop.hive.llap.LlapInputSplit;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.SubmitWorkInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.ext.LlapDaemonInfo;
import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapBaseInputFormat<V extends WritableComparable<?>>
implements InputFormat<NullWritable, V> {
    private static final Logger LOG = LoggerFactory.getLogger(LlapBaseInputFormat.class);
    private static String driverName = "org.apache.hive.jdbc.HiveDriver";
    private static final Object lock = new Object();
    private static final Map<String, List<Connection>> connectionMap = new HashMap<String, List<Connection>>();
    private String url;
    private String user;
    private String pwd;
    private String query;
    private boolean useArrow;
    private long arrowAllocatorLimit;
    private BufferAllocator allocator;
    private final Random rand = new Random();
    public static final String URL_KEY = "llap.if.hs2.connection";
    public static final String QUERY_KEY = "llap.if.query";
    public static final String USER_KEY = "llap.if.user";
    public static final String PWD_KEY = "llap.if.pwd";
    public static final String HANDLE_ID = "llap.if.handleid";
    public static final String DB_KEY = "llap.if.database";
    public static final String USE_NEW_SPLIT_FORMAT = "llap.if.use.new.split.format";
    public static final String SESSION_QUERIES_FOR_GET_NUM_SPLITS = "llap.session.queries.for.get.num.splits";
    public static final Pattern SET_QUERY_PATTERN = Pattern.compile("^\\s*set\\s+.*=.+$", 2);
    public static final String SPLIT_QUERY = "select get_llap_splits(\"%s\",%d)";
    private static final char ESCAPE_CHAR = '\\';
    private static final char[] escapedChars;

    public LlapBaseInputFormat(String url, String user, String pwd, String query) {
        this.url = url;
        this.user = user;
        this.pwd = pwd;
        this.query = query;
    }

    public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) {
        this.useArrow = useArrow;
        this.arrowAllocatorLimit = arrowAllocatorLimit;
    }

    public LlapBaseInputFormat(boolean useArrow, BufferAllocator allocator) {
        this.useArrow = useArrow;
        this.allocator = allocator;
    }

    public LlapBaseInputFormat() {
        this.useArrow = false;
    }

    public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
        int taskNum;
        LlapInputSplit llapSplit = (LlapInputSplit)split;
        HiveConf.setVar((Configuration)job, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, (String)llapSplit.getLlapUser());
        SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes((byte[])llapSplit.getPlanBytes());
        LlapDaemonInfo llapDaemonInfo = llapSplit.getLlapDaemonInfos()[0];
        String host = llapDaemonInfo.getHost();
        int outputPort = llapDaemonInfo.getOutputFormatPort();
        int llapSubmitPort = llapDaemonInfo.getRpcPort();
        LOG.info("Will try to submit request to first Llap Daemon in the split - {}", (Object)llapDaemonInfo);
        byte[] llapTokenBytes = llapSplit.getTokenBytes();
        Token llapToken = null;
        if (llapTokenBytes != null) {
            DataInputBuffer in = new DataInputBuffer();
            in.reset(llapTokenBytes, 0, llapTokenBytes.length);
            llapToken = new Token();
            llapToken.readFields((DataInput)in);
        }
        LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder = new LlapRecordReaderTaskUmbilicalExternalResponder();
        LlapTaskUmbilicalExternalClient llapClient = new LlapTaskUmbilicalExternalClient((Configuration)job, submitWorkInfo.getTokenIdentifier(), submitWorkInfo.getToken(), (LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder)umbilicalResponder, llapToken);
        int attemptNum = 0;
        TaskAttemptID taskAttemptId = TaskAttemptID.forName((String)job.get("mapreduce.task.attempt.id"));
        if (taskAttemptId != null) {
            attemptNum = taskAttemptId.getId();
            taskNum = taskAttemptId.getTaskID().getId();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Setting attempt number to: {}, task number to: {} from given taskAttemptId: {} in conf", new Object[]{attemptNum, taskNum, taskAttemptId});
            }
        } else {
            taskNum = llapSplit.getSplitNum();
        }
        LlapDaemonProtocolProtos.SubmitWorkRequestProto request = this.constructSubmitWorkRequestProto(submitWorkInfo, taskNum, attemptNum, llapClient.getAddress(), (Token<JobTokenIdentifier>)submitWorkInfo.getToken(), llapSplit, job);
        LlapDaemonProtocolProtos.SignableVertexSpec vertex = LlapDaemonProtocolProtos.SignableVertexSpec.parseFrom((byte[])submitWorkInfo.getVertexBinary());
        String fragmentId = Converters.createTaskAttemptId((LlapDaemonProtocolProtos.QueryIdentifierProto)vertex.getQueryIdentifier(), (int)vertex.getVertexIndex(), (int)request.getFragmentNumber(), (int)request.getAttemptNumber()).toString();
        LOG.info("Submitting fragment:{} to llap [host = {}, port = {}] ", new Object[]{fragmentId, host, llapSubmitPort});
        llapClient.submitWork(request, host, llapSubmitPort);
        Socket socket = new Socket(host, outputPort);
        OutputStream socketStream = socket.getOutputStream();
        LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.Builder builder = LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.newBuilder().setFragmentId(fragmentId);
        if (llapSplit.getTokenBytes() != null) {
            builder.setToken(ByteString.copyFrom((byte[])llapSplit.getTokenBytes()));
        }
        LOG.info("Registering fragment:{} to llap [host = {}, output port = {}] to read output", new Object[]{fragmentId, host, outputPort});
        builder.build().writeDelimitedTo(socketStream);
        socketStream.flush();
        LOG.info("Registered id: " + fragmentId);
        LlapBaseRecordReader recordReader = this.useArrow ? (this.allocator != null ? new LlapArrowBatchRecordReader(socket.getInputStream(), llapSplit.getSchema(), ArrowWrapperWritable.class, job, (Closeable)llapClient, socket, this.allocator) : new LlapArrowBatchRecordReader(socket.getInputStream(), llapSplit.getSchema(), ArrowWrapperWritable.class, job, (Closeable)llapClient, socket, this.arrowAllocatorLimit)) : new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), BytesWritable.class, job, (Closeable)llapClient, (Closeable)socket);
        umbilicalResponder.setRecordReader(recordReader);
        return recordReader;
    }

    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
        ArrayList<LlapInputSplit> ins = new ArrayList<LlapInputSplit>();
        if (this.url == null) {
            this.url = job.get(URL_KEY);
        }
        if (this.query == null) {
            this.query = job.get(QUERY_KEY);
        }
        if (this.user == null) {
            this.user = job.get(USER_KEY);
        }
        if (this.pwd == null) {
            this.pwd = job.get(PWD_KEY);
        }
        String database = job.get(DB_KEY);
        if (this.url == null || this.query == null) {
            throw new IllegalStateException();
        }
        String handleId = job.get(HANDLE_ID);
        if (handleId == null) {
            handleId = UUID.randomUUID().toString();
            LOG.info("Handle ID not specified - generated handle ID {}", (Object)handleId);
        }
        try {
            Class.forName(driverName);
        }
        catch (ClassNotFoundException e) {
            throw new IOException(e);
        }
        LOG.info("Handle ID {}: query={}", (Object)handleId, (Object)this.query);
        String escapedQuery = StringUtils.escapeString((String)this.query, (char)'\\', (char[])escapedChars);
        String sql = String.format(SPLIT_QUERY, escapedQuery, numSplits);
        try {
            Connection conn = DriverManager.getConnection(this.url, this.user, this.pwd);
            try (Statement stmt = conn.createStatement();){
                String sessionQueries;
                if (database != null && !database.isEmpty()) {
                    stmt.execute("USE " + database);
                }
                if ((sessionQueries = job.get(SESSION_QUERIES_FOR_GET_NUM_SPLITS)) != null && !sessionQueries.trim().isEmpty()) {
                    String[] queries;
                    for (String q : queries = sessionQueries.trim().split(",")) {
                        if (SET_QUERY_PATTERN.matcher(q).matches()) {
                            LOG.debug("Executing session query: {}", (Object)q);
                            stmt.execute(q);
                            continue;
                        }
                        LOG.warn("Only SET queries are allowed, not executing this query: {}", (Object)q);
                    }
                }
                boolean useNewSplitFormat = job.getBoolean(USE_NEW_SPLIT_FORMAT, false);
                ResultSet res = stmt.executeQuery(sql);
                int count = 0;
                LlapInputSplit schemaSplit = null;
                LlapInputSplit planSplit = null;
                while (res.next()) {
                    DataInputStream in = new DataInputStream(res.getBinaryStream(2));
                    LlapInputSplit is = new LlapInputSplit();
                    is.readFields((DataInput)in);
                    if (useNewSplitFormat) {
                        ins.add(is);
                        continue;
                    }
                    if (count == 0) {
                        schemaSplit = is;
                        if (numSplits == 0) {
                            ins.add(schemaSplit);
                        }
                    } else if (count == 1) {
                        planSplit = is;
                    } else {
                        is.setSchema(schemaSplit.getSchema());
                        assert (planSplit != null);
                        is.setPlanBytes(planSplit.getPlanBytes());
                        ins.add(is);
                    }
                    ++count;
                }
                res.close();
            }
            catch (Exception e) {
                LOG.error("Closing connection due to error", (Throwable)e);
                conn.close();
                throw e;
            }
            this.addConnection(handleId, conn);
        }
        catch (Exception e) {
            throw new IOException(e);
        }
        return ins.toArray(new InputSplit[ins.size()]);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addConnection(String handleId, Connection connection) {
        Object object = lock;
        synchronized (object) {
            List<Connection> handleConnections = connectionMap.get(handleId);
            if (handleConnections == null) {
                handleConnections = new ArrayList<Connection>();
                connectionMap.put(handleId, handleConnections);
            }
            handleConnections.add(connection);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void close(String handleId) throws IOException {
        List<Connection> handleConnections;
        Object object = lock;
        synchronized (object) {
            handleConnections = connectionMap.remove(handleId);
        }
        LlapBaseInputFormat.closeConnections(handleId, handleConnections);
    }

    private static void closeConnections(String handleId, List<Connection> handleConnections) {
        if (handleConnections != null) {
            LOG.debug("Closing {} connections for handle ID {}", (Object)handleConnections.size(), (Object)handleId);
            for (Connection conn : handleConnections) {
                try {
                    conn.close();
                }
                catch (Exception err) {
                    LOG.error("Error while closing connection for " + handleId, (Throwable)err);
                }
            }
        } else {
            LOG.debug("No connection found for handle ID {}", (Object)handleId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void closeAll() {
        LOG.debug("Closing all handles");
        Object object = lock;
        synchronized (object) {
            Iterator<Map.Entry<String, List<Connection>>> itr = connectionMap.entrySet().iterator();
            Map.Entry<String, List<Connection>> connHandle = null;
            while (itr.hasNext()) {
                connHandle = itr.next();
                LlapBaseInputFormat.closeConnections(connHandle.getKey(), connHandle.getValue());
                itr.remove();
            }
        }
    }

    private LlapDaemonProtocolProtos.SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo, int taskNum, int attemptNum, InetSocketAddress address, Token<JobTokenIdentifier> token, LlapInputSplit llapInputSplit, JobConf job) throws IOException {
        byte[] fragmentBytes = llapInputSplit.getFragmentBytes();
        byte[] fragmentBytesSignature = llapInputSplit.getFragmentBytesSignature();
        ApplicationId appId = submitWorkInfo.getFakeAppId();
        String user = System.getenv(ApplicationConstants.Environment.USER.name());
        LOG.info("Setting user in submitWorkRequest to: " + user);
        ContainerId containerId = ContainerId.newInstance((ApplicationAttemptId)ApplicationAttemptId.newInstance((ApplicationId)appId, (int)attemptNum), (int)taskNum);
        Credentials credentials = new Credentials();
        TokenCache.setSessionToken(token, (Credentials)credentials);
        ByteBuffer credentialsBinary = this.serializeCredentials(credentials);
        LlapDaemonProtocolProtos.FragmentRuntimeInfo.Builder runtimeInfo = LlapDaemonProtocolProtos.FragmentRuntimeInfo.newBuilder();
        runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis());
        runtimeInfo.setWithinDagPriority(0);
        runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime());
        runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime());
        runtimeInfo.setNumSelfAndUpstreamTasks(submitWorkInfo.getVertexParallelism());
        runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0);
        LlapDaemonProtocolProtos.SubmitWorkRequestProto.Builder builder = LlapDaemonProtocolProtos.SubmitWorkRequestProto.newBuilder();
        LlapDaemonProtocolProtos.VertexOrBinary.Builder vertexBuilder = LlapDaemonProtocolProtos.VertexOrBinary.newBuilder();
        vertexBuilder.setVertexBinary(ByteString.copyFrom((byte[])submitWorkInfo.getVertexBinary()));
        if (submitWorkInfo.getVertexSignature() != null) {
            builder.setWorkSpecSignature(ByteString.copyFrom((byte[])submitWorkInfo.getVertexSignature()));
        }
        builder.setWorkSpec(vertexBuilder.build());
        builder.setFragmentNumber(taskNum);
        builder.setAttemptNumber(attemptNum);
        builder.setContainerIdString(containerId.toString());
        builder.setAmHost(LlapUtil.getAmHostNameFromAddress((InetSocketAddress)address, (Configuration)job));
        builder.setAmPort(address.getPort());
        builder.setCredentialsBinary(ByteString.copyFrom((ByteBuffer)credentialsBinary));
        builder.setFragmentRuntimeInfo(runtimeInfo.build());
        builder.setInitialEventBytes(ByteString.copyFrom((byte[])fragmentBytes));
        if (fragmentBytesSignature != null) {
            builder.setInitialEventSignature(ByteString.copyFrom((byte[])fragmentBytesSignature));
        }
        builder.setJwt(llapInputSplit.getJwt());
        builder.setIsExternalClientRequest(true);
        return builder.build();
    }

    private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
        Credentials containerCredentials = new Credentials();
        containerCredentials.addAll(credentials);
        DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
        containerCredentials.writeTokenStorageToStream((DataOutputStream)containerTokens_dob);
        return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
    }

    static {
        ShutdownHookManager.addShutdownHook((Runnable)new Runnable(){

            @Override
            public void run() {
                LlapBaseInputFormat.closeAll();
            }
        });
        escapedChars = new char[]{'\"', '\\'};
    }

    private static class LlapRecordReaderTaskUmbilicalExternalResponder
    implements LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder {
        protected LlapBaseRecordReader<?> recordReader = null;
        protected LinkedBlockingQueue<LlapBaseRecordReader.ReaderEvent> queuedEvents = new LinkedBlockingQueue();

        public void submissionFailed(String fragmentId, Throwable throwable) {
            try {
                this.sendOrQueueEvent(LlapBaseRecordReader.ReaderEvent.errorEvent((String)("Received submission failed event for fragment ID " + fragmentId + ": " + throwable.toString())));
            }
            catch (Exception err) {
                LOG.error("Error during heartbeat responder:", (Throwable)err);
            }
        }

        public void heartbeat(TezHeartbeatRequest request) {
            List inEvents = request.getEvents();
            for (TezEvent tezEvent : ListUtils.emptyIfNull((List)inEvents)) {
                EventType eventType = tezEvent.getEventType();
                try {
                    switch (eventType) {
                        case TASK_ATTEMPT_COMPLETED_EVENT: {
                            this.sendOrQueueEvent(LlapBaseRecordReader.ReaderEvent.doneEvent());
                            break;
                        }
                        case TASK_ATTEMPT_FAILED_EVENT: {
                            TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent)tezEvent.getEvent();
                            this.sendOrQueueEvent(LlapBaseRecordReader.ReaderEvent.errorEvent((String)taskFailedEvent.getDiagnostics()));
                            break;
                        }
                        case TASK_STATUS_UPDATE_EVENT: {
                            break;
                        }
                        default: {
                            LOG.warn("Unhandled event type " + eventType);
                            break;
                        }
                    }
                }
                catch (Exception err) {
                    LOG.error("Error during heartbeat responder:", (Throwable)err);
                }
            }
        }

        public void taskKilled(TezTaskAttemptID taskAttemptId) {
            try {
                this.sendOrQueueEvent(LlapBaseRecordReader.ReaderEvent.errorEvent((String)("Received task killed event for task ID " + taskAttemptId)));
            }
            catch (Exception err) {
                LOG.error("Error during heartbeat responder:", (Throwable)err);
            }
        }

        public void heartbeatTimeout(String taskAttemptId) {
            try {
                this.sendOrQueueEvent(LlapBaseRecordReader.ReaderEvent.errorEvent((String)("Timed out waiting for heartbeat for task ID " + taskAttemptId)));
            }
            catch (Exception err) {
                LOG.error("Error during heartbeat responder:", (Throwable)err);
            }
        }

        public synchronized LlapBaseRecordReader<?> getRecordReader() {
            return this.recordReader;
        }

        public synchronized void setRecordReader(LlapBaseRecordReader recordReader) {
            this.recordReader = recordReader;
            if (recordReader == null) {
                return;
            }
            while (!this.queuedEvents.isEmpty()) {
                LlapBaseRecordReader.ReaderEvent readerEvent = this.queuedEvents.poll();
                LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType());
                recordReader.handleEvent(readerEvent);
            }
        }

        protected synchronized void sendOrQueueEvent(LlapBaseRecordReader.ReaderEvent readerEvent) {
            LlapBaseRecordReader<?> recordReader = this.getRecordReader();
            if (recordReader != null) {
                recordReader.handleEvent(readerEvent);
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType() + " with message " + readerEvent.getMessage());
                }
                try {
                    this.queuedEvents.put(readerEvent);
                }
                catch (Exception err) {
                    throw new RuntimeException("Unexpected exception while queueing reader event", err);
                }
            }
        }

        public void clearQueuedEvents() {
            this.queuedEvents.clear();
        }
    }
}

