1

I am writing a mapreduce program for matrix addition. Since it requires 2 input files, i am using MultipleInputs. I have these following classes

MatAddMapper1.java

package mapred;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MatAddMapper1 extends Mapper<LongWritable, Text, Text, IntWritable> 
{
    //private static final int MISSING = 9999;
    @Override
    public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException
    {
        String line = value.toString();
        String[] content = line.split (" ");
        String key1 = content[0] + " " + content[1];
        int val = Integer.parseInt(content[2]);
        // Key is (i,j)
        context.write(new Text(key1), new IntWritable(val));
    }
}

MatAddMapper2.java is similar.

MatAddReducer.java

package mapred;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MatAddReducer
extends Reducer<Text, IntWritable, Text, IntWritable> 
{
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
    throws IOException, InterruptedException 
    {
        int val = 0;
        for (IntWritable value : values) 
        {
            val = val + value.get();
        }
        context.write(key, new IntWritable(val));
    }
}

MatAddApp.java (Main class)

package mapred;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MatAddApp extends Configured implements Tool
{


     public int run(String[] args) throws Exception 
     {
         Configuration conf = new Configuration();
         @SuppressWarnings("deprecation")
         Job job = new Job(conf, "Matrix Addition");
         job.setJarByClass(MatAddApp.class);
         MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,MatAddMapper1.class);
         MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,MatAddMapper2.class);

         FileOutputFormat.setOutputPath(job, new Path(args[2]));
         job.setReducerClass(MatAddReducer.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(IntWritable.class);

         return (job.waitForCompletion(true) ? 0 : 1);

     }

     public static void main(String[] args) throws Exception 
     {
         int ecode = ToolRunner.run(new MatAddApp(), args);
         System.exit(ecode);
     }

}

I am using eclipse and created a jar file MatAddition.jar. M.txt and N.txt are input matrices. When I tried to run the program in my hadoop cluster, I got the following error

Exception in thread "main" java.lang.ClassNotFoundException: MatAddApp
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:278)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:214)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
2
  • Can you change job.setJarByClass(MatAddApp.class); to job.setJarByClass(mapred.MatAddApp.class); Commented Jun 27, 2016 at 6:46
  • Thanks Sachin. That worked. Commented Jun 27, 2016 at 8:33

2 Answers 2

1

The issue is because of the classname.The driver classname should be fully qualified when setting in configuration as follows:

 job.setJarByClass(mapred.MatAddApp.class); 
Sign up to request clarification or add additional context in comments.

Comments

0

Input.txt

A,0|0,1.0
A,0|1,2.0
A,0|2,3.0
A,0|3,4.0
A,1|0,5.0
A,1|1,6.0
A,1|2,7.0
A,1|3,8.0
B,0|0,1.0
B,0|1,2.0
B,0|2,3.0
B,0|3,4.0
B,1|0,5.0
B,1|1,6.0
B,1|2,7.0
B,1|3,8.0

Here, the first column represents the name of the matrix, second column represents the index and the third represents the value.

MatrixAdd.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import java.util.ArrayList;
import java.util.Iterator;
import java.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MatrixAdd {
    public static class MatMapper extends Mapper<Object, Text, Text, DoubleWritable>{
        private Text index = new Text();
        private final static DoubleWritable num = new DoubleWritable();
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException{      
            String record = value.toString();
            String[] parts = record.split(",");
            index.set(parts[1]);
            num.set(Double.parseDouble(parts[2]));
            context.write(index, num);
        }
    }
    public static class MatReducer extends Reducer<Text,DoubleWritable,Text,DoubleWritable> {
        private DoubleWritable result = new DoubleWritable();
        public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            double sumValue = 0;
            for(DoubleWritable val: values) {
                sumValue += val.get();
            }
            result.set(sumValue);
            context.write(key, result);
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "max temp");
        job.setJarByClass(MatrixAdd.class);
        job.setMapperClass(MatMapper.class);
        job.setCombinerClass(MatReducer.class);
        job.setReducerClass(MatReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }    
}

Output:

0|0 2.0
0|1 4.0
0|2 6.0
0|3 8.0
1|0 10.0
1|1 12.0
1|2 14.0
1|3 16.0

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.