why spill failure happens for Custom Data Type in Hadoop
In hadoop I am writing my custom data type as below
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Movie implements WritableComparable { String movieId; String movieTitle; public Movie(String movieId, String movieTitle) { super(); this.movieId = movieId; this.movieTitle = movieTitle; } public Movie(){ } public String getMovieId() { return movieId; } public void setMovieId(String movieId) { this.movieId = movieId; } public String getMovieTitle() { return movieTitle; } public void setMovieTitle(String movieTitle) { this.movieTitle = movieTitle; } @Override public void readFields(DataInput in) throws IOException { movieId = in.readLine(); movieTitle=in.readLine(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeChars(movieId); out.writeChars(movieTitle); } @Override public int compareTo(Movie o) { return movieTitle.compareTo(o.movieTitle); } @Override public int hashCode(){ return movieId.hashCode(); } @Override public boolean equals(Object o){ Movie m=(Movie)o; return movieId.equals(m.movieId); } @Override public String toString(){ return movieTitle; } }
Below is my mapper code
import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MovieMapper extends Mapper { Map> movieMap=new HashMap>(); @Override public void map(LongWritable key,Text value,Context ctx) throws IOException, InterruptedException{ String[] columns=value.toString().split("::"); if(columns.length!=4){ System.out.println("length not equal to 4"); return; } if(movieMap.containsKey(columns[1])){ List mList=movieMap.get(columns[1]); // set movie //System.out.println("In mapper, movieId="+mList.get(0)+", name="+mList.get(1)); Movie movie=new Movie(mList.get(0),mList.get(1)); //movie.setMovieId(mList.get(0)); //movie.setMovieTitle(mList.get(1)); // set MovieRating MovieRating mr=new MovieRating(); mr.setUserId(columns[0]); mr.setRating(Integer.parseInt(columns[2])); mr.setTime(columns[3]); ctx.write(movie,mr); } } @Override protected void setup(Context ctx) throws IOException { loadMovieData(ctx); } public void loadMovieData(Context ctx) throws IOException{ URI[] cacheFiles = DistributedCache.getCacheFiles(ctx.getConfiguration()); System.out.println("inloadMovieData"); if(cacheFiles!=null && cacheFiles.length>0){ System.out.println("Cache files length greater then 0"); for(URI path:cacheFiles){ System.out.println("Cache file="+path.toString()); BufferedReader reader=null; try{ reader=new BufferedReader(new FileReader(path.toString())); String line; while((line=reader.readLine())!=null){ String[] columns = line.split("::"); movieMap.put(columns[0], new ArrayList(Arrays.asList(columns))); } }catch(Exception e){ e.printStackTrace(); } finally{ reader.close(); } } } } }
In mapper class when control reach to ctx.write(movie,mr), then it shows spill faild issue. My reducer is taking input key as Movie and value as MovieRating.
Because you read lines (it is looking for n
in the stream) but you write characters (which don't include n
).
Your methods should look like this:
@Override
public void readFields(DataInput in) throws IOException {
movieId = in.readUTF();
movieTitle = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(movieId);
out.writeUTF(movieTitle);
}
链接地址: http://www.djcxy.com/p/35100.html