June 27, 2022

Avro MapReduce Example

This post shows an Avro MapReduce example program using the Avro MapReduce API.

As an example word count MapReduce program is used where the output will be an Avro data file.

Required jars

avro-mapred-1.8.2.jar

Avro word count MapReduce example

Since output is Avro file so an Avro schema has to be defined, we’ll have two fields in the schema "word" and "count".

In the code you can see the use of AvroKey and AvroValue for the key and value pairs. Also for output AvroKeyOutputFormat class is used.

To define the map output and the output of a MaReduce job AvroJob class is used for job configuration.

Avro MapReduce
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvroWordCount extends Configured implements Tool{
	/// Schema
  private static final Schema AVRO_SCHEMA = new Schema.Parser().parse(
    "{\n" +
    "	\"type\":	\"record\",\n" +				
    "	\"name\":	\"WordCount\",\n" +
    "	\"doc\":	\"word count\",\n" +
    "	\"fields\":\n" + 
    "	[\n" + 
    "			{\"name\": \"word\",	\"type\":	\"string\"},\n"+ 
    "			{\"name\":	\"count\", \"type\":	\"int\"}\n"+
    "	]\n"+
    "}\n");
	
	// Map function    
  public static class AvroWordMapper extends Mapper<LongWritable, Text, AvroKey<Text>, 
      AvroValue<GenericRecord>>{
    private Text word = new Text();
    private GenericRecord record = new GenericData.Record(AVRO_SCHEMA);
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\\s+");
      for (String str : stringArr) {
        word.set(str);
        // creating Avro record
        record.put("word", str);
        record.put("count", 1);
        context.write(new AvroKey<Text>(word), new AvroValue<GenericRecord>(record));
      }
    }
  }
	
	// Reduce function
  public static class AvroWordReducer extends Reducer<AvroKey<Text>, AvroValue<GenericRecord>,
      AvroKey<GenericRecord>, NullWritable>{       
    public void reduce(AvroKey<Text> key, Iterable<AvroValue<GenericRecord>> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (AvroValue<GenericRecord> value : values) {
        GenericRecord    record = value.datum();
        sum += (Integer)record.get("count");
      }
      GenericRecord record = new GenericData.Record(AVRO_SCHEMA);
      record.put("word", key.datum());
      record.put("count", sum);
      context.write(new AvroKey<GenericRecord>(record), NullWritable.get());
    }
  }

  public static void main(String[] args) throws Exception{
    int exitFlag = ToolRunner.run(new AvroWordCount(), args);
    System.exit(exitFlag);
  }
	

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "AvroWC");
    job.setJarByClass(getClass());
    job.setMapperClass(AvroWordMapper.class);    
    job.setReducerClass(AvroWordReducer.class);
    
    AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
    AvroJob.setMapOutputValueSchema(job, AVRO_SCHEMA);
    AvroJob.setOutputKeySchema(job, AVRO_SCHEMA);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

After creating jar you can run this Avro MapReduce program using the following command.

hadoop jar /home/knpcode/knpcodehadoop.jar org.knpcode.AvroWordCount /user/input/count /user/out/result

This program is executed on a simple text file with only two lines.

This is a test file.
This is a Hadoop MapReduce program file.

The output file can be checked using the avrotools.jar.

hadoop jar /path/to/avro-tools-1.8.2.jar tojson /user/out/result/part-r-00000.avro

{"word":"Hadoop","count":1}
{"word":"MapReduce","count":1}
{"word":"This","count":2}
{"word":"a","count":2}
{"word":"file.","count":2}
{"word":"is","count":2}
{"word":"program","count":1}
{"word":"test","count":1}

That's all for the topic Avro MapReduce Example. If something is missing or you have something to share about the topic please write a comment.


You may also like

No comments:

Post a Comment