/*
 * Decompiled with CFR 0.152.
 */
package edu.umd.cloud9.example.memcached.demo;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.MemcachedClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.lib.IdentityReducer;

public class SetLogProbInMemcached {
    private static String getListOfIpAddresses(String inputFile) {
        String ipAddresses = "";
        String port = "11211";
        try {
            String line;
            BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(inputFile)));
            while ((line = in.readLine()) != null) {
                if (line.equals("")) continue;
                String temp = line + ":" + port;
                if (ipAddresses.equals("")) {
                    ipAddresses = temp;
                    continue;
                }
                ipAddresses = ipAddresses + " " + temp;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return ipAddresses;
    }

    public static void main(String[] args) throws IOException {
        if (args.length != 3) {
            System.out.println(" usage : [path of ip address file] [path of sequence file on dfs ] [num mappers]");
            System.exit(1);
        }
        String pathOfIpAddressFile = args[0];
        String inputPathSeqFile = args[1];
        String ipAddress = SetLogProbInMemcached.getListOfIpAddresses(pathOfIpAddressFile);
        if (ipAddress.equals("")) {
            System.out.println("List of Memcache servers IP Addresses not available");
            System.exit(1);
        } else {
            System.out.println("List of IP addresses : " + ipAddress);
        }
        String extraPath = "/tmp";
        MemcachedClient myMCC = new MemcachedClient(AddrUtil.getAddresses((String)ipAddress));
        myMCC.flush();
        myMCC.shutdown();
        int mapTasks = Integer.parseInt(args[2]);
        int reduceTasks = 0;
        JobConf conf = new JobConf(SetLogProbInMemcached.class);
        conf.setJobName("SetLogProbInMemcached");
        conf.set("ADDRESSES", ipAddress);
        conf.setNumMapTasks(mapTasks);
        conf.setNumReduceTasks(reduceTasks);
        FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{new Path(inputPathSeqFile)});
        conf.setInputFormat(SequenceFileInputFormat.class);
        FileOutputFormat.setOutputPath((JobConf)conf, (Path)new Path(extraPath));
        conf.setMapperClass(MyMapper.class);
        conf.setReducerClass(IdentityReducer.class);
        Path outputDir = new Path(extraPath);
        FileSystem.get((Configuration)conf).delete(outputDir, true);
        System.out.println("getting: " + conf.get("ADDRESSES"));
        JobClient.runJob((JobConf)conf);
    }

    private static class MyMapper
    extends MapReduceBase
    implements Mapper<Text, FloatWritable, Text, FloatWritable> {
        MemcachedClient memcachedClient;

        private MyMapper() {
        }

        public void configure(JobConf conf) {
            try {
                this.memcachedClient = new MemcachedClient(AddrUtil.getAddresses((String)conf.get("ADDRESSES")));
                Thread.sleep(500L);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void map(Text text, FloatWritable value, OutputCollector<Text, FloatWritable> output, Reporter reporter) throws IOException {
            if (text.toString().length() > 100) {
                return;
            }
            String obj = Float.valueOf(value.get()).toString();
            this.memcachedClient.set(text.toString(), 72000, (Object)obj);
            try {
                Thread.sleep(1L);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void close() {
            this.memcachedClient.shutdown();
        }
    }
}

