/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io;

import com.datatorrent.lib.io.AbstractHttpInputOperator;
import com.sun.jersey.api.client.ClientResponse;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceStability;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public class HttpJsonChunksInputOperator
extends AbstractHttpInputOperator<Map<String, String>> {
    private static final Logger LOG = LoggerFactory.getLogger(HttpJsonChunksInputOperator.class);

    @Override
    public void processResponse(ClientResponse response) throws IOException {
        InputStream is = (InputStream)response.getEntity(InputStream.class);
        while (true) {
            int bytesRead;
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            byte[] bytes = new byte[255];
            while ((bytesRead = is.read(bytes)) != -1) {
                LOG.debug("read {} bytes", (Object)bytesRead);
                bos.write(bytes, 0, bytesRead);
                if (is.available() != 0 || bos.size() <= 0) continue;
            }
            try {
                if (this.processBytes(bos.toByteArray())) {
                    LOG.debug("End of chunked input stream.");
                    response.close();
                    break;
                }
            }
            catch (JSONException ex) {
                LOG.error("Caught JSON error:", (Throwable)ex);
            }
            if (bytesRead == -1) {
                LOG.error("Unexpected end of chunked input stream");
                response.close();
                break;
            }
            bos.reset();
        }
    }

    private boolean processBytes(byte[] bytes) throws IOException, JSONException {
        StringBuilder chunkStr = new StringBuilder();
        List lines = IOUtils.readLines((InputStream)new ByteArrayInputStream(bytes));
        boolean endStream = false;
        int currentChunkLength = 0;
        for (String line : lines) {
            try {
                int nextLength = Integer.parseInt(line);
                if (nextLength == 0) {
                    endStream = true;
                    break;
                }
                this.processChunk(chunkStr, currentChunkLength);
                currentChunkLength = nextLength;
            }
            catch (NumberFormatException e) {
                chunkStr.append(line);
                chunkStr.append("\n");
            }
        }
        this.processChunk(chunkStr, currentChunkLength);
        return endStream;
    }

    private void processChunk(StringBuilder chunk, int expectedLength) throws JSONException {
        if (expectedLength > 0 && chunk.length() > 0) {
            JSONObject json = new JSONObject(chunk.toString());
            chunk = new StringBuilder();
            HashMap<String, String> tuple = new HashMap<String, String>();
            Iterator it = json.keys();
            while (it.hasNext()) {
                String key = (String)it.next();
                Object val = json.get(key);
                if (val == null) continue;
                String vstr = val.toString();
                tuple.put(key, vstr);
            }
            if (!tuple.isEmpty()) {
                LOG.debug("Got: " + tuple);
                this.outputPort.emit(tuple);
                chunk.setLength(0);
            }
        }
        if (this.rawOutput.isConnected()) {
            this.rawOutput.emit((Object)chunk.toString());
        }
    }
}

