why spill failure happens for Custom Data Type in Hadoop

In hadoop I am writing my custom data type as below

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Movie implements WritableComparable {

    String movieId;
    String movieTitle;

    public Movie(String movieId, String movieTitle) {
        super();
        this.movieId = movieId;
        this.movieTitle = movieTitle;
    }

    public Movie(){

    }

    public String getMovieId() {
        return movieId;
    }

    public void setMovieId(String movieId) {
        this.movieId = movieId;
    }

    public String getMovieTitle() {
        return movieTitle;
    }

    public void setMovieTitle(String movieTitle) {
        this.movieTitle = movieTitle;
    }



    @Override
    public void readFields(DataInput in) throws IOException {
        movieId = in.readLine();
        movieTitle=in.readLine();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeChars(movieId);
        out.writeChars(movieTitle);
    }

    @Override
    public int compareTo(Movie o) {

        return movieTitle.compareTo(o.movieTitle);

    }

    @Override
    public int hashCode(){
        return movieId.hashCode();
    }

    @Override
    public boolean equals(Object o){
        Movie m=(Movie)o; 
        return movieId.equals(m.movieId);
    }

    @Override
    public String toString(){
        return movieTitle;
    }

}

Below is my mapper code

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MovieMapper extends Mapper {

    Map> movieMap=new HashMap>();

    @Override
    public void map(LongWritable key,Text value,Context ctx) throws IOException, InterruptedException{
        String[] columns=value.toString().split("::");
        if(columns.length!=4){
            System.out.println("length not equal to 4");
            return;
        }
        if(movieMap.containsKey(columns[1])){
            List mList=movieMap.get(columns[1]);
            // set movie
            //System.out.println("In mapper, movieId="+mList.get(0)+", name="+mList.get(1));
            Movie movie=new Movie(mList.get(0),mList.get(1));
            //movie.setMovieId(mList.get(0));
            //movie.setMovieTitle(mList.get(1));
            // set MovieRating
            MovieRating mr=new MovieRating();
            mr.setUserId(columns[0]);
            mr.setRating(Integer.parseInt(columns[2]));
            mr.setTime(columns[3]);
            ctx.write(movie,mr);
        }
    }


    @Override
    protected void setup(Context ctx) throws IOException {
        loadMovieData(ctx);
    }

    public void loadMovieData(Context ctx) throws IOException{
        URI[] cacheFiles = DistributedCache.getCacheFiles(ctx.getConfiguration());
        System.out.println("inloadMovieData");
        if(cacheFiles!=null && cacheFiles.length>0){
            System.out.println("Cache files length greater then 0");
            for(URI path:cacheFiles){
                System.out.println("Cache file="+path.toString());
                BufferedReader reader=null;
                try{
                    reader=new BufferedReader(new FileReader(path.toString()));
                    String line;
                    while((line=reader.readLine())!=null){
                        String[] columns = line.split("::");
                        movieMap.put(columns[0], new ArrayList(Arrays.asList(columns)));
                    }
                }catch(Exception e){
                    e.printStackTrace();
                }
                finally{
                    reader.close();
                }

            }


        }
    }

}

In mapper class when control reach to ctx.write(movie,mr), then it shows spill faild issue. My reducer is taking input key as Movie and value as MovieRating.


Because you read lines (it is looking for n in the stream) but you write characters (which don't include n ).

Your methods should look like this:

@Override
public void readFields(DataInput in) throws IOException {
    movieId = in.readUTF();
    movieTitle = in.readUTF();
}

@Override
public void write(DataOutput out) throws IOException {
    out.writeUTF(movieId);
    out.writeUTF(movieTitle);
}
链接地址: http://www.djcxy.com/p/35100.html

上一篇: 如何使用WritableComparator Hadoop

下一篇: 为什么在Hadoop中的自定义数据类型发生溢出失败