/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
import org.apache.hadoop.hive.llap.LlapOutputFormat;
import org.apache.hadoop.hive.llap.LlapOutputFormatService;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestLlapOutputFormat {
    private static final Logger LOG = LoggerFactory.getLogger(TestLlapOutputFormat.class);
    private static LlapOutputFormatService service;

    @BeforeClass
    public static void setUp() throws Exception {
        LOG.debug("Setting up output service");
        Configuration conf = new Configuration();
        HiveConf.setIntVar((Configuration)conf, (HiveConf.ConfVars)HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT, (int)0);
        LlapOutputFormatService.initializeAndStart((Configuration)conf, null);
        service = LlapOutputFormatService.get();
        LlapProxy.setDaemon((boolean)true);
        LOG.debug("Output service up");
    }

    @AfterClass
    public static void tearDown() throws IOException, InterruptedException {
        LOG.debug("Tearing down service");
        service.stop();
        LOG.debug("Tearing down complete");
    }

    @Test
    public void testValues() throws Exception {
        JobConf job = new JobConf();
        for (int k = 0; k < 5; ++k) {
            String id = "foobar" + k;
            job.set("llap.of.id", id);
            LlapOutputFormat format = new LlapOutputFormat();
            HiveConf conf = new HiveConf();
            Socket socket = new Socket("localhost", service.getPort());
            LOG.debug("Socket connected");
            OutputStream socketStream = socket.getOutputStream();
            LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.newBuilder().setFragmentId(id).build().writeDelimitedTo(socketStream);
            socketStream.flush();
            Thread.sleep(3000L);
            LOG.debug("Data written");
            RecordWriter writer = format.getRecordWriter(null, job, null, null);
            Text text = new Text();
            LOG.debug("Have record writer");
            for (int i = 0; i < 10; ++i) {
                text.set("" + i);
                writer.write((Object)NullWritable.get(), (Object)text);
            }
            writer.close(null);
            InputStream in = socket.getInputStream();
            LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class, job, null, null);
            LOG.debug("Have record reader");
            reader.handleEvent(LlapBaseRecordReader.ReaderEvent.doneEvent());
            int count = 0;
            while (reader.next(NullWritable.get(), (WritableComparable)text)) {
                LOG.debug(text.toString());
                ++count;
            }
            reader.close();
            Assert.assertEquals((long)10L, (long)count);
        }
    }

    @Test
    public void testBadClientMessage() throws Exception {
        JobConf job = new JobConf();
        String id = "foobar";
        job.set("llap.of.id", id);
        LlapOutputFormat format = new LlapOutputFormat();
        Socket socket = new Socket("localhost", service.getPort());
        LOG.debug("Socket connected");
        OutputStream socketStream = socket.getOutputStream();
        LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.newBuilder().setFragmentId(id).build().writeDelimitedTo(socketStream);
        LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.newBuilder().setFragmentId(id).build().writeDelimitedTo(socketStream);
        socketStream.flush();
        Thread.sleep(3000L);
        LOG.debug("Data written");
        try {
            format.getRecordWriter(null, job, null, null);
            Assert.fail((String)"Didn't throw");
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }
}

