/*
 * Decompiled with CFR 0.152.
 */
package edu.umd.hooka.alignment;

import edu.umd.cloud9.mapred.NullInputFormat;
import edu.umd.cloud9.mapred.NullMapper;
import edu.umd.cloud9.mapred.NullOutputFormat;
import edu.umd.hooka.Alignment;
import edu.umd.hooka.AlignmentPosteriorGrid;
import edu.umd.hooka.CorpusVocabNormalizerAndNumberizer;
import edu.umd.hooka.PServer;
import edu.umd.hooka.PServerClient;
import edu.umd.hooka.PhrasePair;
import edu.umd.hooka.Vocab;
import edu.umd.hooka.VocabularyWritable;
import edu.umd.hooka.alignment.AlignmentEventListener;
import edu.umd.hooka.alignment.AlignmentModel;
import edu.umd.hooka.alignment.CrossEntropyCounters;
import edu.umd.hooka.alignment.HadoopAlignConfig;
import edu.umd.hooka.alignment.IndexedFloatArray;
import edu.umd.hooka.alignment.PartialCountContainer;
import edu.umd.hooka.alignment.PerplexityReporter;
import edu.umd.hooka.alignment.aer.ReferenceAlignment;
import edu.umd.hooka.alignment.hmm.ATable;
import edu.umd.hooka.alignment.hmm.HMM;
import edu.umd.hooka.alignment.hmm.HMM_NullWord;
import edu.umd.hooka.alignment.model1.Model1;
import edu.umd.hooka.alignment.model1.Model1_InitUniform;
import edu.umd.hooka.ttables.TTable;
import edu.umd.hooka.ttables.TTable_monolithic_IFAs;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.PriorityQueue;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class HadoopAlign {
    private static final Logger sLogger = Logger.getLogger(HadoopAlign.class);
    static boolean usePServer = false;
    static final String KEY_TRAINER = "ha.trainer";
    static final String KEY_ITERATION = "ha.model.iteration";
    static final String MODEL1_UNIFORM_INIT = "model1.uniform";
    static final String MODEL1_TRAINER = "model1.trainer";
    static final String HMM_TRAINER = "hmm.baumwelch.trainer";
    static final String TTABLE_ITERATION_OUTPUT = "em.model-data.file";
    static PServer pserver = null;
    private static final String INPUT_OPTION = "input";
    private static final String WORK_OPTION = "workdir";
    private static final String FLANG_OPTION = "src_lang";
    private static final String ELANG_OPTION = "trg_lang";
    private static final String MODEL1_OPTION = "model1";
    private static final String HMM_OPTION = "hmm";
    private static final String REDUCE_OPTION = "reduce";
    private static final String TRUNCATE_OPTION = "use_truncate";
    private static final String LIBJARS_OPTION = "libjars";
    private static Options options;

    public static ATable loadATable(Path path, Configuration job) throws IOException {
        Configuration conf = new Configuration(job);
        FileSystem fileSys = FileSystem.get((Configuration)conf);
        DataInputStream in = new DataInputStream(new BufferedInputStream((InputStream)fileSys.open(path)));
        ATable at = new ATable();
        at.readFields(in);
        return at;
    }

    public static Vocab loadVocab(Path path, Configuration job) throws IOException {
        Configuration conf = new Configuration(job);
        FileSystem fileSys = FileSystem.get((Configuration)conf);
        DataInputStream in = new DataInputStream(new BufferedInputStream((InputStream)fileSys.open(path)));
        VocabularyWritable at = new VocabularyWritable();
        at.readFields(in);
        return at;
    }

    public static Vocab loadVocab(Path path, FileSystem fileSys) throws IOException {
        DataInputStream in = new DataInputStream(new BufferedInputStream((InputStream)fileSys.open(path)));
        VocabularyWritable at = new VocabularyWritable();
        at.readFields(in);
        return at;
    }

    static double ComputeAER(Counters c) {
        double den = c.getCounter((Enum)AlignmentEvalEnum.HYPOTHESIZED_ALIGNMENT_POINTS) + c.getCounter((Enum)AlignmentEvalEnum.REF_ALIGNMENT_POINTS);
        double num = c.getCounter((Enum)AlignmentEvalEnum.PROBABLE_HITS) + c.getCounter((Enum)AlignmentEvalEnum.SURE_HITS);
        double aer = (double)((int)((1.0 - num / den) * 10000.0)) / 100.0;
        double prec = (double)((int)((double)c.getCounter((Enum)AlignmentEvalEnum.PROBABLE_HITS) / (double)c.getCounter((Enum)AlignmentEvalEnum.HYPOTHESIZED_ALIGNMENT_POINTS) * 10000.0)) / 100.0;
        System.out.println("PREC: " + prec);
        return aer;
    }

    static String startPServers(HadoopAlignConfig hac) throws IOException {
        int port = 4444;
        pserver = new PServer(4444, FileSystem.get((Configuration)hac), hac.getTTablePath());
        Thread th = new Thread(pserver);
        th.start();
        throw new RuntimeException("Shouldn't use PServer");
    }

    static void stopPServers() throws IOException {
        if (pserver != null) {
            pserver.stopServer();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void doAlignment(int mapTasks, int reduceTasks, HadoopAlignConfig hac) throws IOException {
        System.out.println("Running alignment: " + (Object)((Object)hac));
        FileSystem fs = FileSystem.get((Configuration)hac);
        Path cbtxt = new Path(hac.getRoot() + "/comp-bitext");
        if (!fs.exists(cbtxt)) {
            CorpusVocabNormalizerAndNumberizer.preprocessAndNumberizeFiles(hac, hac.getBitexts(), cbtxt);
        }
        System.out.println("Finished preprocessing");
        int m1iters = hac.getModel1Iterations();
        int hmmiters = hac.getHMMIterations();
        int totalIterations = m1iters + hmmiters;
        String modelType = null;
        ArrayList<Double> perps = new ArrayList<Double>();
        ArrayList<Double> aers = new ArrayList<Double>();
        boolean hmm = false;
        boolean firstHmm = true;
        Path model1PosteriorsPath = null;
        for (int iteration = 0; iteration < totalIterations; ++iteration) {
            Counters c;
            RunningJob job;
            JobConf conf;
            boolean lastModel1Iteration;
            long start = System.currentTimeMillis();
            hac.setBoolean("ha.generate.posterios", false);
            boolean lastIteration = iteration == totalIterations - 1;
            boolean bl = lastModel1Iteration = iteration == m1iters - 1;
            if (iteration >= m1iters) {
                hmm = true;
            }
            modelType = hmm ? "HMM" : "Model1";
            FileSystem fileSys = FileSystem.get((Configuration)hac);
            String sOutputPath = modelType + ".data." + iteration;
            Path outputPath = new Path(sOutputPath);
            try {
                if (usePServer && iteration > 0) {
                    HadoopAlign.startPServers(hac);
                }
                System.out.println("Starting iteration " + iteration + (iteration == 0 ? " (initialization)" : "") + ": " + modelType);
                conf = new JobConf((Configuration)hac, HadoopAlign.class);
                conf.setJobName("EMTrain." + modelType + ".iter" + iteration);
                conf.setInputFormat(SequenceFileInputFormat.class);
                conf.set(KEY_TRAINER, MODEL1_TRAINER);
                conf.set(KEY_ITERATION, Integer.toString(iteration));
                conf.set("mapred.child.java.opts", "-Xmx2048m");
                if (iteration == 0) {
                    conf.set(KEY_TRAINER, MODEL1_UNIFORM_INIT);
                }
                if (hmm) {
                    conf.set(KEY_TRAINER, HMM_TRAINER);
                    if (firstHmm) {
                        firstHmm = false;
                        System.out.println("Writing default a-table...");
                        Path pathATable = hac.getATablePath();
                        fileSys.delete(pathATable, true);
                        DataOutputStream dos = new DataOutputStream(new BufferedOutputStream((OutputStream)fileSys.create(pathATable)));
                        int cond_values = 1;
                        if (!hac.isHMMHomogeneous()) {
                            cond_values = 100;
                        }
                        ATable at = new ATable(hac.isHMMHomogeneous(), cond_values, 100);
                        at.normalize();
                        at.write(dos);
                        dos.close();
                    }
                }
                conf.setOutputKeyClass(IntWritable.class);
                conf.setOutputValueClass(PartialCountContainer.class);
                conf.setMapperClass(EMapper.class);
                conf.setReducerClass(EMReducer.class);
                conf.setNumMapTasks(mapTasks);
                conf.setNumReduceTasks(reduceTasks);
                System.out.println("Running job " + conf.getJobName());
                if (model1PosteriorsPath != null) {
                    System.out.println("Input: " + model1PosteriorsPath);
                    FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{model1PosteriorsPath});
                } else {
                    System.out.println("Input: " + cbtxt);
                    FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{cbtxt});
                }
                System.out.println("Output: " + outputPath);
                FileOutputFormat.setOutputPath((JobConf)conf, (Path)new Path(hac.getRoot() + "/" + outputPath.toString()));
                fileSys.delete(new Path(hac.getRoot() + "/" + outputPath.toString()), true);
                conf.setOutputFormat(SequenceFileOutputFormat.class);
                job = JobClient.runJob((JobConf)conf);
                c = job.getCounters();
                double lp = c.getCounter((Enum)CrossEntropyCounters.LOGPROB);
                double wc = c.getCounter((Enum)CrossEntropyCounters.WORDCOUNT);
                double ce = lp / wc / Math.log(2.0);
                double perp = Math.pow(2.0, ce);
                double aer = HadoopAlign.ComputeAER(c);
                System.out.println("Iteration " + iteration + ": (" + modelType + ")\tCROSS-ENTROPY: " + ce + "   PERPLEXITY: " + perp);
                System.out.println("Iteration " + iteration + ": " + aer + " AER");
                aers.add(aer);
                perps.add(perp);
            }
            finally {
                HadoopAlign.stopPServers();
            }
            conf = new JobConf((Configuration)hac, ModelMergeMapper2.class);
            System.err.println("Setting em.model-data.file to " + outputPath.toString());
            conf.set(TTABLE_ITERATION_OUTPUT, hac.getRoot() + "/" + outputPath.toString());
            conf.setJobName("EMTrain.ModelMerge");
            conf.setMapperClass(ModelMergeMapper2.class);
            conf.setSpeculativeExecution(false);
            conf.setNumMapTasks(1);
            conf.setNumReduceTasks(0);
            conf.setInputFormat(NullInputFormat.class);
            conf.setOutputFormat(NullOutputFormat.class);
            conf.set("mapred.map.child.java.opts", "-Xmx2048m");
            conf.set("mapred.reduce.child.java.opts", "-Xmx2048m");
            System.out.println("Running job " + conf.getJobName());
            System.out.println("Input: " + hac.getRoot() + "/dummy");
            System.out.println("Output: " + hac.getRoot() + "/dummy.out");
            JobClient.runJob((JobConf)conf);
            fileSys.delete(new Path(hac.getRoot() + "/" + outputPath.toString()), true);
            if (lastIteration || lastModel1Iteration) {
                conf = new JobConf((Configuration)hac, HadoopAlign.class);
                sOutputPath = modelType + ".data." + iteration;
                outputPath = new Path(sOutputPath);
                conf.setJobName(modelType + ".align");
                conf.set("mapred.map.child.java.opts", "-Xmx2048m");
                conf.set("mapred.reduce.child.java.opts", "-Xmx2048m");
                conf.setInputFormat(SequenceFileInputFormat.class);
                conf.setOutputFormat(SequenceFileOutputFormat.class);
                conf.set(KEY_TRAINER, MODEL1_TRAINER);
                conf.set(KEY_ITERATION, Integer.toString(iteration));
                if (hmm) {
                    conf.set(KEY_TRAINER, HMM_TRAINER);
                }
                conf.setOutputKeyClass(Text.class);
                conf.setOutputValueClass(PhrasePair.class);
                conf.setMapperClass(AlignMapper.class);
                conf.setReducerClass(IdentityReducer.class);
                conf.setNumMapTasks(mapTasks);
                conf.setNumReduceTasks(reduceTasks);
                FileOutputFormat.setOutputPath((JobConf)conf, (Path)new Path(hac.getRoot() + "/" + outputPath.toString()));
                if (lastModel1Iteration) {
                    FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{cbtxt});
                    model1PosteriorsPath = new Path(hac.getRoot() + "/" + outputPath.toString());
                } else {
                    FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{model1PosteriorsPath});
                }
                fileSys.delete(outputPath, true);
                System.out.println("Running job " + conf.getJobName());
                job = JobClient.runJob((JobConf)conf);
                System.out.println("GENERATED: " + model1PosteriorsPath);
                c = job.getCounters();
                double aer = HadoopAlign.ComputeAER(c);
                System.out.println("Iteration " + iteration + ": " + aer + " AER");
                aers.add(aer);
                perps.add(0.0);
            }
            long end = System.currentTimeMillis();
            System.out.println(modelType + " iteration " + iteration + " took " + (end - start) / 1000L + " seconds.");
        }
        for (int i = 0; i < perps.size(); ++i) {
            System.out.print("I=" + i + "\t");
            if (aers.size() > 0) {
                System.out.print(aers.get(i) + "\t");
            }
            System.out.println(perps.get(i));
        }
    }

    private static void printUsage() {
        HelpFormatter formatter = new HelpFormatter();
        formatter.printHelp(HadoopAlign.class.getCanonicalName(), options);
    }

    public static void main(String[] args) throws IOException {
        int hmmIters;
        CommandLine cmdline;
        options = new Options();
        OptionBuilder.withDescription((String)"path to XML-formatted parallel corpus");
        OptionBuilder.withArgName((String)"path");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired();
        options.addOption(OptionBuilder.create((String)INPUT_OPTION));
        OptionBuilder.withDescription((String)"path to work/output directory on HDFS");
        OptionBuilder.withArgName((String)"path");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired();
        options.addOption(OptionBuilder.create((String)WORK_OPTION));
        OptionBuilder.withDescription((String)"two-letter collection language code");
        OptionBuilder.withArgName((String)"en|de|fr|zh|es|ar|tr");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired();
        options.addOption(OptionBuilder.create((String)FLANG_OPTION));
        OptionBuilder.withDescription((String)"two-letter collection language code");
        OptionBuilder.withArgName((String)"en|de|fr|zh|es|ar|tr");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired();
        options.addOption(OptionBuilder.create((String)ELANG_OPTION));
        OptionBuilder.withDescription((String)"number of IBM Model 1 iterations");
        OptionBuilder.withArgName((String)"positive integer");
        OptionBuilder.hasArg();
        options.addOption(OptionBuilder.create((String)MODEL1_OPTION));
        OptionBuilder.withDescription((String)"number of HMM iterations");
        OptionBuilder.withArgName((String)"positive integer");
        OptionBuilder.hasArg();
        options.addOption(OptionBuilder.create((String)HMM_OPTION));
        OptionBuilder.withDescription((String)"truncate/stem text or not");
        options.addOption(OptionBuilder.create((String)TRUNCATE_OPTION));
        OptionBuilder.withDescription((String)"number of reducers");
        OptionBuilder.withArgName((String)"positive integer");
        OptionBuilder.hasArg();
        options.addOption(OptionBuilder.create((String)REDUCE_OPTION));
        OptionBuilder.withDescription((String)"Hadoop option to load external jars");
        OptionBuilder.withArgName((String)"jar packages");
        OptionBuilder.hasArg();
        options.addOption(OptionBuilder.create((String)LIBJARS_OPTION));
        GnuParser parser = new GnuParser();
        try {
            cmdline = parser.parse(options, args);
        }
        catch (ParseException exp) {
            HadoopAlign.printUsage();
            System.err.println("Error parsing command line: " + exp.getMessage());
            return;
        }
        String bitextPath = cmdline.getOptionValue(INPUT_OPTION);
        String workDir = cmdline.getOptionValue(WORK_OPTION);
        String srcLang = cmdline.getOptionValue(FLANG_OPTION);
        String trgLang = cmdline.getOptionValue(ELANG_OPTION);
        int model1Iters = cmdline.hasOption(MODEL1_OPTION) ? Integer.parseInt(cmdline.getOptionValue(MODEL1_OPTION)) : 0;
        int n = hmmIters = cmdline.hasOption(HMM_OPTION) ? Integer.parseInt(cmdline.getOptionValue(HMM_OPTION)) : 0;
        if (model1Iters + hmmIters == 0) {
            System.err.println("Please enter a positive number of iterations for either Model 1 or HMM");
            HadoopAlign.printUsage();
            return;
        }
        boolean isTruncate = cmdline.hasOption(TRUNCATE_OPTION);
        int numReducers = cmdline.hasOption(REDUCE_OPTION) ? Integer.parseInt(cmdline.getOptionValue(REDUCE_OPTION)) : 50;
        HadoopAlignConfig hac = new HadoopAlignConfig(workDir, trgLang, srcLang, bitextPath, model1Iters, hmmIters, true, false, isTruncate, 0.0f);
        hac.setHMMHomogeneous(false);
        hac.set("mapreduce.map.memory.mb", "2048");
        hac.set("mapreduce.map.java.opts", "-Xmx2048m");
        hac.set("mapreduce.reduce.memory.mb", "2048");
        hac.set("mapreduce.reduce.java.opts", "-Xmx2048m");
        hac.setHMMp0(0.2);
        hac.setMaxSentLen(15);
        HadoopAlign.doAlignment(50, numReducers, hac);
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public static class ModelMergeMapper
    extends MapReduceBase
    implements Mapper<LongWritable, Text, LongWritable, Text> {
        Path outputPath = null;
        Path ttablePath = null;
        Path atablePath = null;
        HadoopAlignConfig hac = null;
        JobConf xjob = null;

        public void configure(JobConf job) {
            this.xjob = job;
            this.hac = new HadoopAlignConfig((Configuration)job);
            this.ttablePath = this.hac.getTTablePath();
            this.atablePath = this.hac.getATablePath();
            this.outputPath = new Path(job.get(HadoopAlign.TTABLE_ITERATION_OUTPUT));
        }

        public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {
            IntWritable k = new IntWritable();
            PartialCountContainer t = new PartialCountContainer();
            FileSystem fileSys = FileSystem.get((Configuration)this.xjob);
            fileSys.delete(this.outputPath.suffix("/_logs"), true);
            SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders((Configuration)this.xjob, (Path)this.outputPath);
            FileReaderZip z = new FileReaderZip(readers);
            TTable_monolithic_IFAs tt = new TTable_monolithic_IFAs(fileSys, this.ttablePath, false);
            boolean emittedATable = false;
            while (z.next(k, t)) {
                if (t.getType() == 3) {
                    ((TTable)tt).set(k.get(), (IndexedFloatArray)t.getContent());
                    if (k.get() % 1000 == 0) {
                        reporter.progress();
                    }
                    reporter.incrCounter((Enum)MergeCounters.EWORDS, 1L);
                    reporter.incrCounter((Enum)MergeCounters.STATISTICS, (long)(((IndexedFloatArray)t.getContent()).size() + 1));
                    continue;
                }
                if (emittedATable) {
                    throw new RuntimeException("Should only have a single ATable!");
                }
                ATable at = (ATable)t.getContent();
                fileSys.delete(this.atablePath, true);
                DataOutputStream dos = new DataOutputStream(new BufferedOutputStream((OutputStream)fileSys.create(this.atablePath)));
                at.write(dos);
                dos.close();
                emittedATable = true;
            }
            fileSys.delete(this.ttablePath, true);
            ((TTable)tt).write();
            output.collect((Object)key, (Object)value);
        }

        /*
         * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
         */
        static enum MergeCounters {
            EWORDS,
            STATISTICS;

        }
    }

    private static class ModelMergeMapper2
    extends NullMapper {
        private ModelMergeMapper2() {
        }

        public void run(JobConf job, Reporter reporter) throws IOException {
            sLogger.setLevel(Level.INFO);
            Path outputPath = null;
            Path ttablePath = null;
            Path atablePath = null;
            HadoopAlignConfig hac = null;
            JobConf xjob = null;
            xjob = job;
            hac = new HadoopAlignConfig((Configuration)job);
            ttablePath = hac.getTTablePath();
            atablePath = hac.getATablePath();
            outputPath = new Path(job.get(HadoopAlign.TTABLE_ITERATION_OUTPUT));
            IntWritable k = new IntWritable();
            PartialCountContainer t = new PartialCountContainer();
            FileSystem fileSys = FileSystem.get((Configuration)xjob);
            fileSys.delete(outputPath.suffix("/_logs"), true);
            fileSys.delete(outputPath.suffix("/_SUCCESS"), true);
            sLogger.info((Object)("Reading from " + outputPath + ", exists? " + fileSys.exists(outputPath)));
            TTable_monolithic_IFAs tt = new TTable_monolithic_IFAs(fileSys, ttablePath, false);
            boolean emittedATable = false;
            FileStatus[] status = fileSys.listStatus(outputPath);
            for (int i = 0; i < status.length; ++i) {
                sLogger.info((Object)("Reading " + status[i].getPath() + ", exists? " + fileSys.exists(status[i].getPath())));
                SequenceFile.Reader reader = new SequenceFile.Reader((Configuration)xjob, new SequenceFile.Reader.Option[]{SequenceFile.Reader.file((Path)status[i].getPath())});
                while (reader.next((Writable)k, (Writable)t)) {
                    if (t.getType() == 3) {
                        ((TTable)tt).set(k.get(), (IndexedFloatArray)t.getContent());
                        if (k.get() % 1000 == 0) {
                            reporter.progress();
                        }
                        reporter.incrCounter((Enum)MergeCounters.EWORDS, 1L);
                        reporter.incrCounter((Enum)MergeCounters.STATISTICS, (long)(((IndexedFloatArray)t.getContent()).size() + 1));
                        continue;
                    }
                    if (emittedATable) {
                        throw new RuntimeException("Should only have a single ATable!");
                    }
                    ATable at = (ATable)t.getContent();
                    fileSys.delete(atablePath, true);
                    DataOutputStream dos = new DataOutputStream(new BufferedOutputStream((OutputStream)fileSys.create(atablePath)));
                    at.write(dos);
                    dos.close();
                    emittedATable = true;
                }
                reader.close();
            }
            fileSys.delete(ttablePath, true);
            ((TTable)tt).write();
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    static enum MergeCounters {
        EWORDS,
        STATISTICS;

    }

    public static class FileReaderZip {
        PriorityQueue<SFRComp> pq = new PriorityQueue();

        public FileReaderZip(SequenceFile.Reader[] files) throws IOException {
            for (SequenceFile.Reader r : files) {
                SFRComp s = new SFRComp(r);
                if (!s.isValid()) continue;
                this.pq.add(s);
            }
        }

        boolean next(IntWritable k, PartialCountContainer v) throws IOException {
            if (this.pq.size() == 0) {
                return false;
            }
            SFRComp t = (SFRComp)this.pq.remove();
            v.setContent(t.getValue().getContent());
            k.set(t.getKey());
            t.read();
            if (t.isValid()) {
                this.pq.add(t);
            }
            return true;
        }

        /*
         * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
         */
        private static class SFRComp
        implements Comparable<SFRComp> {
            PartialCountContainer cur = new PartialCountContainer();
            IntWritable k = new IntWritable();
            SequenceFile.Reader s;
            boolean valid;

            public SFRComp(SequenceFile.Reader x) throws IOException {
                this.s = x;
                this.read();
            }

            public void read() throws IOException {
                this.valid = this.s.next((Writable)this.k, (Writable)this.cur);
            }

            public int getKey() {
                return this.k.get();
            }

            public boolean isValid() {
                return this.valid;
            }

            @Override
            public int compareTo(SFRComp o) {
                if (!this.valid) {
                    throw new RuntimeException("Shouldn't happen");
                }
                return this.k.get() - o.k.get();
            }

            public PartialCountContainer getValue() {
                return this.cur;
            }
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public static class EMReducer
    extends MapReduceBase
    implements Reducer<IntWritable, PartialCountContainer, IntWritable, PartialCountContainer> {
        boolean variationalBayes = false;
        IntWritable oe = new IntWritable();
        PartialCountContainer pcc = new PartialCountContainer();
        float[] counts = new float[1000000];
        float alpha = 0.0f;

        public void configure(JobConf job) {
            HadoopAlignConfig hac = new HadoopAlignConfig((Configuration)job);
            this.variationalBayes = hac.useVariationalBayes();
            this.alpha = hac.getAlpha();
        }

        public void reduce(IntWritable key, Iterator<PartialCountContainer> values, OutputCollector<IntWritable, PartialCountContainer> output, Reporter reporter) throws IOException {
            int lm = 0;
            if (HMM.ACOUNT_VOC_ID.get() != key.get()) {
                while (values.hasNext()) {
                    IndexedFloatArray v = (IndexedFloatArray)values.next().getContent();
                    if (v.maxKey() + 1 > lm) {
                        Arrays.fill(this.counts, lm, v.maxKey() + 1, 0.0f);
                        lm = v.maxKey() + 1;
                    }
                    v.addTo(this.counts);
                }
                IndexedFloatArray sum = new IndexedFloatArray(this.counts, lm);
                this.pcc.setContent(sum);
            } else {
                ATable sum = null;
                while (values.hasNext()) {
                    if (sum == null) {
                        sum = (ATable)((ATable)values.next().getContent()).clone();
                        continue;
                    }
                    sum.plusEquals((ATable)values.next().getContent());
                }
                this.pcc.setContent(sum);
            }
            this.pcc.normalize(this.variationalBayes, this.alpha);
            output.collect((Object)key, (Object)this.pcc);
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public static class AlignMapper
    extends AlignmentBase
    implements Mapper<Text, PhrasePair, Text, PhrasePair> {
        boolean first = true;
        Text astr = new Text();

        public void map(Text key, PhrasePair value, OutputCollector<Text, PhrasePair> output, Reporter reporter) throws IOException {
            if (this.first) {
                this.init();
                this.first = false;
                this.trainer.addAlignmentListener(new AEListener(reporter));
            }
            PerplexityReporter pr = new PerplexityReporter();
            AlignmentPosteriorGrid model1g = null;
            if (value.hasAlignmentPosteriors()) {
                model1g = value.getAlignmentPosteriorGrid();
            }
            if (this.trainer instanceof HMM && model1g != null) {
                ((HMM)this.trainer).setModel1Posteriors(model1g);
            }
            Alignment a = this.trainer.viterbiAlign(value, pr);
            ReferenceAlignment ref = (ReferenceAlignment)value.getAlignment();
            AlignmentPosteriorGrid ghmm = null;
            AlignmentPosteriorGrid gmodel1 = null;
            if (this.generatePosteriors) {
                if (value.hasAlignmentPosteriors()) {
                    model1g = value.getAlignmentPosteriorGrid();
                }
                if (this.trainer instanceof HMM) {
                    ((HMM)this.trainer).setModel1Posteriors(model1g);
                }
                AlignmentPosteriorGrid g = this.trainer.computeAlignmentPosteriors(value);
                if (value.hasAlignmentPosteriors()) {
                    model1g = value.getAlignmentPosteriorGrid();
                    Alignment model1a = model1g.alignPosteriorThreshold(0.5f);
                    ghmm = g;
                    gmodel1 = model1g;
                    Alignment da = model1g.alignPosteriorThreshold((float)Math.exp(-1.5));
                    Alignment ints = Alignment.intersect(da, model1a);
                }
                value.setAlignmentPosteriorGrid(g);
            }
            if (ref != null) {
                a = this.trainer.computeAlignmentPosteriors(value).alignPosteriorThreshold(0.5f);
                reporter.incrCounter((Enum)AlignmentEvalEnum.SURE_HITS, (long)ref.countSureHits(a));
                reporter.incrCounter((Enum)AlignmentEvalEnum.PROBABLE_HITS, (long)ref.countProbableHits(a));
                reporter.incrCounter((Enum)AlignmentEvalEnum.HYPOTHESIZED_ALIGNMENT_POINTS, (long)a.countAlignmentPoints());
                reporter.incrCounter((Enum)AlignmentEvalEnum.REF_ALIGNMENT_POINTS, (long)ref.countSureAlignmentPoints());
                if (gmodel1 != null) {
                    StringBuffer sb = new StringBuffer();
                    for (int i = 0; i < ref.getELength(); ++i) {
                        for (int j = 0; j < ref.getFLength(); ++j) {
                            if (ref.isProbableAligned(j, i) || ref.isSureAligned(j, i)) {
                                sb.append("Y");
                            } else {
                                sb.append("N");
                            }
                            sb.append(" 1:").append(gmodel1.getAlignmentPointPosterior(j, i + 1));
                            sb.append(" 3:").append(ghmm.getAlignmentPointPosterior(j, i + 1));
                            if (a.aligned(j, i)) {
                                sb.append(" 4:1");
                            } else {
                                sb.append(" 4:0");
                            }
                            sb.append('\n');
                        }
                    }
                }
            }
            this.astr.set(a.toString());
            output.collect((Object)key, (Object)value);
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public static class EMapper
    extends AlignmentBase
    implements Mapper<Text, PhrasePair, IntWritable, PartialCountContainer> {
        OutputCollector<IntWritable, PartialCountContainer> output_ = null;

        public void map(Text key, PhrasePair value, OutputCollector<IntWritable, PartialCountContainer> output, Reporter reporter) throws IOException {
            if (this.output_ == null) {
                this.output_ = output;
                this.init();
                this.trainer.addAlignmentListener(new AEListener(reporter));
            }
            if (usePServer && this.ttable != null) {
                ((PServerClient)this.ttable).query(value, this.useNullWord);
            }
            AlignmentPosteriorGrid model1g = null;
            if (value.hasAlignmentPosteriors()) {
                model1g = value.getAlignmentPosteriorGrid();
            }
            if (this.trainer instanceof HMM) {
                ((HMM)this.trainer).setModel1Posteriors(model1g);
            }
            this.trainer.processTrainingInstance(value, reporter);
            if (value.hasAlignment() && !(this.trainer instanceof Model1_InitUniform)) {
                PerplexityReporter pr = new PerplexityReporter();
                Alignment a = this.trainer.viterbiAlign(value, pr);
                a = this.trainer.computeAlignmentPosteriors(value).alignPosteriorThreshold(0.5f);
                ReferenceAlignment ref = (ReferenceAlignment)value.getAlignment();
                reporter.incrCounter((Enum)AlignmentEvalEnum.SURE_HITS, (long)ref.countSureHits(a));
                reporter.incrCounter((Enum)AlignmentEvalEnum.PROBABLE_HITS, (long)ref.countProbableHits(a));
                reporter.incrCounter((Enum)AlignmentEvalEnum.HYPOTHESIZED_ALIGNMENT_POINTS, (long)a.countAlignmentPoints());
                reporter.incrCounter((Enum)AlignmentEvalEnum.REF_ALIGNMENT_POINTS, (long)ref.countSureAlignmentPoints());
            }
            this.hasCounts = true;
        }

        public void close() {
            if (!this.hasCounts) {
                return;
            }
            try {
                this.trainer.clearModel();
                this.trainer.writePartialCounts(this.output_);
            }
            catch (IOException e) {
                throw new RuntimeException("Caught: " + e);
            }
        }
    }

    public static class AlignmentBase
    extends MapReduceBase {
        Path ltp = null;
        AlignmentModel trainer = null;
        boolean useNullWord = false;
        boolean hasCounts = false;
        String trainerType = null;
        int iteration = -1;
        HadoopAlignConfig job = null;
        FileSystem ttfs = null;
        TTable ttable = null;
        boolean generatePosteriors = false;

        public void configure(JobConf j) {
            this.job = new HadoopAlignConfig((Configuration)j);
            this.generatePosteriors = j.getBoolean("ha.generate.posteriors", false);
            try {
                this.ttfs = FileSystem.get((Configuration)this.job);
            }
            catch (IOException e) {
                throw new RuntimeException("Caught " + e);
            }
            Object localFiles = null;
            this.trainerType = this.job.get(HadoopAlign.KEY_TRAINER);
            if (this.trainerType == null || this.trainerType.equals("")) {
                throw new RuntimeException("Missing key: ha.trainer");
            }
            String it = this.job.get(HadoopAlign.KEY_ITERATION);
            if (it == null || it.equals("")) {
                throw new RuntimeException("Missing key: ha.model.iteration");
            }
            this.iteration = Integer.parseInt(it);
            this.ltp = localFiles != null && (localFiles).length > 0 ? localFiles[0] : this.job.getTTablePath();
        }

        public void init() throws IOException {
            String pserveHost = this.job.get("ha.pserver.host");
            pserveHost = "localhost";
            String sp = this.job.get("ha.pserver.port");
            int pservePort = 5444;
            if (sp != null) {
                pservePort = Integer.parseInt(sp);
            }
            this.useNullWord = this.job.includeNullWord();
            if (this.trainerType.equals(HadoopAlign.MODEL1_UNIFORM_INIT)) {
                this.trainer = new Model1_InitUniform(this.useNullWord);
            } else if (this.trainerType.equals(HadoopAlign.MODEL1_TRAINER)) {
                this.ttable = usePServer ? new PServerClient(pserveHost, pservePort) : new TTable_monolithic_IFAs(this.ttfs, this.ltp, true);
                this.trainer = new Model1(this.ttable, this.useNullWord);
            } else if (this.trainerType.equals(HadoopAlign.HMM_TRAINER)) {
                this.ttable = usePServer ? new PServerClient(pserveHost, pservePort) : new TTable_monolithic_IFAs(this.ttfs, this.ltp, true);
                ATable atable = HadoopAlign.loadATable(this.job.getATablePath(), this.job);
                this.trainer = !this.useNullWord ? new HMM(this.ttable, atable) : new HMM_NullWord(this.ttable, atable, this.job.getHMMp0());
            } else {
                throw new RuntimeException("Don't understand initialization stategy: " + this.trainerType);
            }
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public static enum AlignmentEvalEnum {
        SURE_HITS,
        PROBABLE_HITS,
        HYPOTHESIZED_ALIGNMENT_POINTS,
        REF_ALIGNMENT_POINTS;

    }

    protected static class AEListener
    implements AlignmentEventListener {
        private Reporter r;

        public AEListener(Reporter rep) {
            this.r = rep;
        }

        public void notifyUnalignablePair(PhrasePair pp, String reason) {
            this.r.incrCounter((Enum)CrossEntropyCounters.INFINITIES, 1L);
            System.err.println("Can't align " + pp);
        }
    }
}

