June 27, 2022

How to Use LZO Compression in Hadoop

This post shows how to install and use LZO compression in Hadoop. The topics covered are-

  1. Installing LZO packages.
  2. Downloading and packaging hadoop-lzo. Using hadoop-lzo makes LZO compressed files splittable when used as input to MapReduce job.
  3. Configuring LZO packages so that you can use LZO compression in Hadoop.
  4. Java program that compresses a file using LZOCodec.
  5. An example showing LZO compression in Hadoop MapReduce.
  6. How to index .lzo file to make it splittable.

Installing LZO packages

For installing LZO packages in Ubuntu use the following command.

sudo apt-get install liblzo2-2 liblzo2-dev

Downloading and packaging hadoop-lzo

You will need to get hadoop-lzo jars in order to make lzo splittable. For that you will need to clone the hadoop-lzo repository and build it.

Another option is to use the rpm package which you can download from here- https://code.google.com/archive/p/hadoop-gpl-packing/downloads

Here I am showing the steps for cloning and building it. Refer this URL- https://github.com/twitter/hadoop-lzo for further understanding.

Maven is also required for packaging the cloned code. If you don’t have maven installed you can install maven on your system using the following command.

$ sudo apt install maven

Clone the hadoop-lzo repository.

$ git clone https://github.com/twitter/hadoop-lzo.git

In order to compile the code and build the hadoop-lzo jar change directory to your cloned hadoop-lzo directory and use the following commands.

mvn clean
mvn install

This should create a target folder with the created jar - hadoop-lzo-0.4.21-SNAPSHOT.jar.

Configuration for using LZO compression with Hadoop

Since you are going to use LZO compression with MapReduce job so copy hadoop-lzo jar to /share/hadoop/mapreduce/lib in your $HADOOP_INSTALLATION_DIR.

sudo cp /home/knpcode/hadoop-lzo/target/hadoop-lzo-0.4.21-SNAPSHOT.jar $HADOOP_INSTALLATION_DIR/share/hadoop/mapreduce/lib

Also add jar to Hadoop class path. For that add the following in $HADOOP_INSTALLATION_DIR/etc/hadoop/hadoop-env.sh

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/knpcode/hadoop-lzo/target/hadoop-lzo-0.4.21-SNAPSHOT.jar

export JAVA_LIBRARY_PATH=/home/knpcode/hadoop-lzo/target/native/Linux-amd64-64:$HADOOP_INSTALLATION_DIR/lib/native

You will also need to update the configuration file $HADOOP_INSTALLATION_DIR/etc/hadoop/core-site.xml to register external codecs for LZO.

<property>
  <name>io.compression.codecs</name>
  <value>org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec, 
    org.apache.hadoop.io.compress.BZip2Codec, com.hadoop.compression.lzo.LzoCodec, 
    com.hadoop.compression.lzo.LzopCodec
  </value>
</property>
<property>
  <name>io.compression.codec.lzo.class</name>
  <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

Example Java program to use LZO compression in Hadoop

Here is a Java program that compresses the file using LzopCodec. Input file is in local file system where as the compressed output file is stored in HDFS.

Make sure that you have added the created external jar for hadoop-lzo in Java build path.

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;

public class LzoCompress {
  public static void main(String[] args) {
    Configuration conf = new Configuration();
    InputStream in = null;
    OutputStream out = null;
    try {
      FileSystem fs = FileSystem.get(conf);
      // Input file from local file system
      in = new BufferedInputStream(new FileInputStream("/home/knpcode/Documents/knpcode/Hadoop/Test/data.txt"));
      //Compressed Output file
      Path outFile = new Path("/user/compout/data.lzo");
      // Verification
      if (fs.exists(outFile)) {
        System.out.println("Output file already exists");
        throw new IOException("Output file already exists");
      }			
      out = fs.create(outFile);

      CompressionCodecFactory	factory	= new CompressionCodecFactory(conf);
      CompressionCodec codec = factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec");
      CompressionOutputStream	compressionOutputStream	= codec.createOutputStream(out);
      
      try {
        IOUtils.copyBytes(in, compressionOutputStream, 4096, false);
        compressionOutputStream.finish();
        
      } finally {
        IOUtils.closeStream(in);
        IOUtils.closeStream(compressionOutputStream);
      }
			
    } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}
Executing program in Hadoop environment

To execute above Java program in Hadoop environment, you will need to add the directory containing the .class file for the Java program in Hadoop’s classpath.

$ export HADOOP_CLASSPATH='/huser/eclipse-workspace/knpcode/bin'

I have my LzoCompress.class file in location /huser/eclipse-workspace/knpcode/bin so I have exported that path.

Then you can run the program using the following command-

$ hadoop org.knpcode.LzoCompress

Just to check how many blocks are occupied by the compressed file.

hdfs fsck /user/compout/data.lzo

.Status: HEALTHY
 Total size:	417954415 B
 Total dirs:	0
 Total files:	1
 Total symlinks:		0
 Total blocks (validated):	4 (avg. block size 104488603 B)
 Minimally replicated blocks:	4 (100.0 %)

FSCK ended at Sat Mar 24 20:08:33 IST 2018 in 8 milliseconds

As you can see that the file is big enough to occupy 4 HDFS blocks. That will help us in checking if MapReduce is able to create splits for the compressed file or not.

Using LZOCompression in Hadoop MapReduce

Let’s create a simple MapReduce job that uses the created .lzo as input. In order to use LZO compressed file in Hadoop MapReduce as input the input format that has to be used is LzoTextInputFormat.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.hadoop.mapreduce.LzoTextInputFormat;

public class LzoWordCount extends Configured implements Tool{
  // Map function
  public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text word = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\\s+");
      for (String str : stringArr) {
        word.set(str);
        context.write(word, new IntWritable(1));
      }       
    }
  }
	
  // Reduce function
  public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{		   
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
  public static void main(String[] args)  throws Exception{    
    int exitFlag = ToolRunner.run(new LzoWordCount(), args);
    System.exit(exitFlag);
  }
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "WC");
    job.setJarByClass(LzoWordCount.class);
    job.setMapperClass(MyMapper.class);    
    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(LzoTextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    //job.addFileToClassPath(new Path("/home/knpcode/hadoop-lzo/target/hadoop-lzo-0.4.21-SNAPSHOT.jar")); 
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

If you run this MapReduce job you can see that only one split is created.

$ hadoop jar /home/knpcode/Documents/knpcode/Hadoop/lzowordcount.jar org.knpcode.LzoWordCount /user/compout/data.lzo /user/output1

18/03/25 19:14:09 INFO input.FileInputFormat: Total input files to process : 1
18/03/25 19:14:10 INFO mapreduce.JobSubmitter: number of splits:1

Map task is not able to split the LZO compressed file so it uses the whole file as one input split which means only one Map task will process the whole file. In order to make LZO file splittable you will have to run indexer. You can run lzo indexer as a Java program or as a MapReduce job.

Running lzo indexer as Java program
$ hadoop jar /home/knpcode/hadoop-lzo/target/hadoop-lzo-0.4.21-SNAPSHOT.jar com.hadoop.compression.lzo.LzoIndexer /user/compout/data.lzo
Running lzo indexer as MapReduce job
$ hadoop jar /home/knpcode/hadoop-lzo/target/hadoop-lzo-0.4.21-SNAPSHOT.jar com.hadoop.compression.lzo.DistributedLzoIndexer /user/compout/data.lzo

Either way it should create an .index file (/user/compout/data.lzo.index) which means your .lzo file is successfully indexed and is splittable now. To check it run the MapReduce job again.

hadoop jar /home/knpcode/Documents/knpcode/Hadoop/lzowordcount.jar org.knpcode.LzoWordCount /user/compout/data.lzo /user/output2

18/03/25 19:25:22 INFO input.FileInputFormat: Total input files to process : 1
18/03/25 19:25:22 INFO mapreduce.JobSubmitter: number of splits:4

In the console you can see that now Map task is able to create 4 input splits corresponding to 4 HDFS blocks.

Reference-

That's all for the topic How to Use LZO Compression in Hadoop. If something is missing or you have something to share about the topic please write a comment.


You may also like

No comments:

Post a Comment