/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap;

import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
import org.apache.hadoop.hive.llap.LlapInputSplit;
import org.apache.hadoop.hive.llap.SubmitWorkInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapBaseInputFormat<V extends WritableComparable>
implements InputFormat<NullWritable, V> {
    private static final Logger LOG = LoggerFactory.getLogger(LlapBaseInputFormat.class);
    private static String driverName = "org.apache.hive.jdbc.HiveDriver";
    private String url;
    private String user;
    private String pwd;
    private String query;
    public static final String URL_KEY = "llap.if.hs2.connection";
    public static final String QUERY_KEY = "llap.if.query";
    public static final String USER_KEY = "llap.if.user";
    public static final String PWD_KEY = "llap.if.pwd";
    public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
    private Connection con;
    private Statement stmt;

    public LlapBaseInputFormat(String url, String user, String pwd, String query) {
        this.url = url;
        this.user = user;
        this.pwd = pwd;
        this.query = query;
    }

    public LlapBaseInputFormat() {
    }

    public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
        LlapInputSplit llapSplit = (LlapInputSplit)split;
        HiveConf.setVar((Configuration)job, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, (String)llapSplit.getLlapUser());
        SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes((byte[])llapSplit.getPlanBytes());
        ServiceInstance serviceInstance = this.getServiceInstance(job, llapSplit);
        String host = serviceInstance.getHost();
        int llapSubmitPort = serviceInstance.getRpcPort();
        LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort + " and outputformat port " + serviceInstance.getOutputFormatPort());
        LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder = new LlapRecordReaderTaskUmbilicalExternalResponder();
        LlapTaskUmbilicalExternalClient llapClient = new LlapTaskUmbilicalExternalClient((Configuration)job, submitWorkInfo.getTokenIdentifier(), submitWorkInfo.getToken(), (LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder)umbilicalResponder);
        llapClient.init((Configuration)job);
        llapClient.start();
        LlapDaemonProtocolProtos.SubmitWorkRequestProto submitWorkRequestProto = this.constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(), llapClient.getAddress(), (Token<JobTokenIdentifier>)submitWorkInfo.getToken());
        TezEvent tezEvent = new TezEvent();
        DataInputBuffer dib = new DataInputBuffer();
        dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length);
        tezEvent.readFields((DataInput)dib);
        ArrayList tezEventList = Lists.newArrayList();
        tezEventList.add(tezEvent);
        llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, (List)tezEventList);
        String id = HiveConf.getVar((Configuration)job, (HiveConf.ConfVars)HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum();
        HiveConf conf = new HiveConf();
        Socket socket = new Socket(host, serviceInstance.getOutputFormatPort());
        LOG.debug("Socket connected");
        socket.getOutputStream().write(id.getBytes());
        socket.getOutputStream().write(0);
        socket.getOutputStream().flush();
        LOG.info("Registered id: " + id);
        LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class, job);
        umbilicalResponder.setRecordReader(recordReader);
        return recordReader;
    }

    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
        ArrayList<LlapInputSplit> ins = new ArrayList<LlapInputSplit>();
        if (this.url == null) {
            this.url = job.get(URL_KEY);
        }
        if (this.query == null) {
            this.query = job.get(QUERY_KEY);
        }
        if (this.user == null) {
            this.user = job.get(USER_KEY);
        }
        if (this.pwd == null) {
            this.pwd = job.get(PWD_KEY);
        }
        if (this.url == null || this.query == null) {
            throw new IllegalStateException();
        }
        try {
            Class.forName(driverName);
        }
        catch (ClassNotFoundException e) {
            throw new IOException(e);
        }
        try {
            this.con = DriverManager.getConnection(this.url, this.user, this.pwd);
            this.stmt = this.con.createStatement();
            String sql = String.format("select get_splits(\"%s\",%d)", this.query, numSplits);
            ResultSet res = this.stmt.executeQuery(sql);
            while (res.next()) {
                DataInputStream in = new DataInputStream(res.getBinaryStream(1));
                LlapInputSplit is = new LlapInputSplit();
                is.readFields((DataInput)in);
                ins.add(is);
            }
            res.close();
            this.stmt.close();
        }
        catch (Exception e) {
            throw new IOException(e);
        }
        return ins.toArray(new InputSplit[ins.size()]);
    }

    public void close() {
        try {
            this.con.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException {
        String host;
        LlapRegistryService registryService = LlapRegistryService.getClient((Configuration)job);
        ServiceInstance serviceInstance = this.getServiceInstanceForHost(registryService, host = llapSplit.getLocations()[0]);
        if (serviceInstance == null) {
            throw new IOException("No service instances found for " + host + " in registry");
        }
        return serviceInstance;
    }

    private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException {
        InetAddress address = InetAddress.getByName(host);
        ServiceInstanceSet instanceSet = registryService.getInstances();
        ServiceInstance serviceInstance = null;
        String name = address.getHostName();
        LOG.info("Searching service instance by hostname " + name);
        serviceInstance = this.selectServiceInstance(instanceSet.getByHost(name));
        if (serviceInstance != null) {
            return serviceInstance;
        }
        name = address.getCanonicalHostName();
        LOG.info("Searching service instance by canonical hostname " + name);
        serviceInstance = this.selectServiceInstance(instanceSet.getByHost(name));
        if (serviceInstance != null) {
            return serviceInstance;
        }
        name = address.getHostAddress();
        LOG.info("Searching service instance by address " + name);
        serviceInstance = this.selectServiceInstance(instanceSet.getByHost(name));
        if (serviceInstance != null) {
            return serviceInstance;
        }
        return serviceInstance;
    }

    private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) {
        if (serviceInstances == null || serviceInstances.isEmpty()) {
            return null;
        }
        for (ServiceInstance serviceInstance : serviceInstances) {
            if (!serviceInstance.isAlive()) continue;
            return serviceInstance;
        }
        LOG.info("No live service instances were found");
        return null;
    }

    private LlapDaemonProtocolProtos.SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo, int taskNum, InetSocketAddress address, Token<JobTokenIdentifier> token) throws IOException {
        TaskSpec taskSpec = submitWorkInfo.getTaskSpec();
        ApplicationId appId = submitWorkInfo.getFakeAppId();
        int attemptId = taskSpec.getTaskAttemptID().getId();
        String user = System.getenv(ApplicationConstants.Environment.USER.name());
        LOG.info("Setting user in submitWorkRequest to: " + user);
        LlapDaemonProtocolProtos.SignableVertexSpec svs = Converters.convertTaskSpecToProto((TaskSpec)taskSpec, (int)attemptId, (String)appId.toString(), null, (String)user);
        ContainerId containerId = ContainerId.newInstance((ApplicationAttemptId)ApplicationAttemptId.newInstance((ApplicationId)appId, (int)0), (int)taskNum);
        Credentials taskCredentials = new Credentials();
        Credentials credentials = new Credentials();
        TokenCache.setSessionToken(token, (Credentials)credentials);
        ByteBuffer credentialsBinary = this.serializeCredentials(credentials);
        LlapDaemonProtocolProtos.FragmentRuntimeInfo.Builder runtimeInfo = LlapDaemonProtocolProtos.FragmentRuntimeInfo.newBuilder();
        runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis());
        runtimeInfo.setWithinDagPriority(0);
        runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime());
        runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime());
        runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism());
        runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0);
        LlapDaemonProtocolProtos.SubmitWorkRequestProto.Builder builder = LlapDaemonProtocolProtos.SubmitWorkRequestProto.newBuilder();
        builder.setWorkSpec(LlapDaemonProtocolProtos.VertexOrBinary.newBuilder().setVertex(svs).build());
        builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
        builder.setAttemptNumber(0);
        builder.setContainerIdString(containerId.toString());
        builder.setAmHost(address.getHostName());
        builder.setAmPort(address.getPort());
        builder.setCredentialsBinary(ByteString.copyFrom((ByteBuffer)credentialsBinary));
        builder.setFragmentRuntimeInfo(runtimeInfo.build());
        return builder.build();
    }

    private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
        Credentials containerCredentials = new Credentials();
        containerCredentials.addAll(credentials);
        DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
        containerCredentials.writeTokenStorageToStream((DataOutputStream)containerTokens_dob);
        return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
    }

    private static class LlapRecordReaderTaskUmbilicalExternalResponder
    implements LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder {
        protected LlapBaseRecordReader recordReader = null;
        protected LinkedBlockingQueue<LlapBaseRecordReader.ReaderEvent> queuedEvents = new LinkedBlockingQueue();

        public void submissionFailed(String fragmentId, Throwable throwable) {
            try {
                this.sendOrQueueEvent(LlapBaseRecordReader.ReaderEvent.errorEvent((String)("Received submission failed event for fragment ID " + fragmentId)));
            }
            catch (Exception err) {
                LOG.error("Error during heartbeat responder:", (Throwable)err);
            }
        }

        public void heartbeat(TezHeartbeatRequest request) {
            TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
            List inEvents = request.getEvents();
            for (TezEvent tezEvent : ListUtils.emptyIfNull((List)inEvents)) {
                EventType eventType = tezEvent.getEventType();
                try {
                    switch (eventType) {
                        case TASK_ATTEMPT_COMPLETED_EVENT: {
                            this.sendOrQueueEvent(LlapBaseRecordReader.ReaderEvent.doneEvent());
                            break;
                        }
                        case TASK_ATTEMPT_FAILED_EVENT: {
                            TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent)tezEvent.getEvent();
                            this.sendOrQueueEvent(LlapBaseRecordReader.ReaderEvent.errorEvent((String)taskFailedEvent.getDiagnostics()));
                            break;
                        }
                        case TASK_STATUS_UPDATE_EVENT: {
                            break;
                        }
                        default: {
                            LOG.warn("Unhandled event type " + eventType);
                            break;
                        }
                    }
                }
                catch (Exception err) {
                    LOG.error("Error during heartbeat responder:", (Throwable)err);
                }
            }
        }

        public void taskKilled(TezTaskAttemptID taskAttemptId) {
            try {
                this.sendOrQueueEvent(LlapBaseRecordReader.ReaderEvent.errorEvent((String)("Received task killed event for task ID " + taskAttemptId)));
            }
            catch (Exception err) {
                LOG.error("Error during heartbeat responder:", (Throwable)err);
            }
        }

        public void heartbeatTimeout(String taskAttemptId) {
            try {
                this.sendOrQueueEvent(LlapBaseRecordReader.ReaderEvent.errorEvent((String)("Timed out waiting for heartbeat for task ID " + taskAttemptId)));
            }
            catch (Exception err) {
                LOG.error("Error during heartbeat responder:", (Throwable)err);
            }
        }

        public synchronized LlapBaseRecordReader getRecordReader() {
            return this.recordReader;
        }

        public synchronized void setRecordReader(LlapBaseRecordReader recordReader) {
            this.recordReader = recordReader;
            if (recordReader == null) {
                return;
            }
            while (!this.queuedEvents.isEmpty()) {
                LlapBaseRecordReader.ReaderEvent readerEvent = this.queuedEvents.poll();
                LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType());
                recordReader.handleEvent(readerEvent);
            }
        }

        protected synchronized void sendOrQueueEvent(LlapBaseRecordReader.ReaderEvent readerEvent) {
            LlapBaseRecordReader recordReader = this.getRecordReader();
            if (recordReader != null) {
                recordReader.handleEvent(readerEvent);
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType() + " with message " + readerEvent.getMessage());
                }
                try {
                    this.queuedEvents.put(readerEvent);
                }
                catch (Exception err) {
                    throw new RuntimeException("Unexpected exception while queueing reader event", err);
                }
            }
        }

        public void clearQueuedEvents() {
            this.queuedEvents.clear();
        }
    }
}

