/*
 * Decompiled with CFR 0.152.
 */
package edu.umd.cloud9.example.memcached.demo;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.MemcachedClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.IdentityReducer;

public class DemoMemcachedAccess {
    private static String getListOfIpAddresses(String inputFile) {
        String ipAddresses = "";
        String port = "11211";
        try {
            String line;
            BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(inputFile)));
            while ((line = in.readLine()) != null) {
                if (line.equals("")) continue;
                String temp = line + ":" + port;
                if (ipAddresses.equals("")) {
                    ipAddresses = temp;
                    continue;
                }
                ipAddresses = ipAddresses + " " + temp;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return ipAddresses;
    }

    protected DemoMemcachedAccess() {
    }

    public static void main(String[] args) throws IOException {
        if (args.length != 3) {
            System.out.println(" usage : [path of ip address file] [path of sequence file on hdfs] [no of Map Tasks]");
            System.exit(1);
        }
        String pathOfIpAddressFile = args[0];
        String inputPath = args[1];
        String ipAddress = DemoMemcachedAccess.getListOfIpAddresses(pathOfIpAddressFile);
        if (ipAddress.equals("")) {
            System.out.println("List of Memcache servers IP Addresses not available");
            System.exit(1);
        } else {
            System.out.println("List of IP addresses : " + ipAddress);
        }
        String extraPath = "/results";
        int mapTasks = Integer.parseInt(args[2]);
        int reduceTasks = 0;
        JobConf conf = new JobConf(DemoMemcachedAccess.class);
        conf.setJobName("DemoMemcachedAccess");
        conf.set("ADDRESSES", ipAddress);
        conf.setNumMapTasks(mapTasks);
        conf.setNumReduceTasks(reduceTasks);
        FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{new Path(inputPath)});
        conf.setInputFormat(TextInputFormat.class);
        conf.setMapOutputKeyClass(LongWritable.class);
        conf.setMapOutputValueClass(FloatWritable.class);
        conf.setMapperClass(MyMapper.class);
        conf.setReducerClass(IdentityReducer.class);
        Path outputDir = new Path(extraPath);
        FileSystem.get((Configuration)conf).delete(outputDir, true);
        FileOutputFormat.setOutputPath((JobConf)conf, (Path)outputDir);
        long startTime = System.currentTimeMillis();
        JobClient.runJob((JobConf)conf);
        long endTime = System.currentTimeMillis();
        long diff = endTime - startTime;
        System.out.println("Total job completion time (ms): " + diff);
    }

    private static class MyMapper
    extends MapReduceBase
    implements Mapper<LongWritable, Text, LongWritable, FloatWritable> {
        MemcachedClient memcachedClient;

        private MyMapper() {
        }

        public void configure(JobConf conf) {
            try {
                this.memcachedClient = new MemcachedClient(AddrUtil.getAddresses((String)conf.get("ADDRESSES")));
                Thread.sleep(500L);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void map(LongWritable key, Text value, OutputCollector<LongWritable, FloatWritable> output, Reporter reporter) throws IOException {
            FloatWritable totalProb = new FloatWritable();
            String line = value.toString();
            StringTokenizer itr = new StringTokenizer(line);
            float sum = 0.0f;
            while (itr.hasMoreTokens()) {
                String temp = itr.nextToken();
                if (temp.toString().length() > 100) continue;
                long startTime = System.currentTimeMillis();
                Object obj = this.memcachedClient.get(temp);
                long endTime = System.currentTimeMillis();
                long diff = endTime - startTime;
                reporter.incrCounter((Enum)MyCounters.TIME, diff);
                if (obj == null) {
                    throw new RuntimeException("Error getting from memcache: key = " + temp);
                }
                sum += Float.parseFloat(obj.toString());
            }
            totalProb.set(sum);
            output.collect((Object)key, (Object)totalProb);
        }

        public void close() {
            this.memcachedClient.shutdown();
        }
    }

    static enum MyCounters {
        TIME;

    }
}

