Advertisements
Home > Information Technology, NOSQL > TPCH-1 Query on Hadoop MapReduce

TPCH-1 Query on Hadoop MapReduce

For all of you who doesn’t understand what hadoop is, you can read the following article: http://hadoop.apache.org/. In this task, we want to convert the following query into Hadoop MapReduce programming format:

SELECT

l_returnflag,

l_linestatus,

avg(l_quantity) as avg_qty,

avg(l_extendedprice) as avg_price,

avg(l_discount) as avg_disc,

count(*) as count_order

FROM

lineitem

WHERE

l_shipdate <= date ‘1995-12-01’

GROUP BY

l_returnflag,

l_linestatus

ORDER BY

l_returnflag,

l_linestatus;

 

Source code:

HadoopMapper.java

/*

* HadoopMapper.java

*

* Created on Oct 29, 2010, 8:02:25 PM

*/
import java.io.IOException;

import java.util.Calendar;

import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;
/*

*

*

* @author beta13

*/

public class TPCHMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

private Text word = new Text();

@Override

public void map(LongWritable key, Text value,OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line,”|”);

int i = 1;

Text head = new Text();

Text tail = new Text();

Calendar c = Calendar.getInstance();

c.set(1995, 12, 1);

while (tokenizer.hasMoreTokens()) {

String token = tokenizer.nextToken();

if(i == 6) {

tail.set(tail.toString()+” “+token);

}

else if (i == 7) {

tail.set(tail.toString()+” “+token);

}

else if ( i == 8 ) {

tail.set(tail.toString()+” “+token);

}

else if (i == 9) {

head.set(head.toString()+” “+token);

}

else if (i == 10) {

head.set(head.toString()+” “+token);

}

else if (i == 11) {

StringTokenizer data = new StringTokenizer(token,”-“);

Calendar d = Calendar.getInstance();

d.set(Integer.parseInt(data.nextToken()),Integer.parseInt(data.nextToken()),Integer.parseInt(data.nextToken()));

if(c.getTimeInMillis() > d.getTimeInMillis()) {

output.collect(head, tail);

}

}

i++;

}

}

}

HadoopReducer.java

/*

* HadoopReducer.java

*

* Created on Oct 29, 2010, 8:02:55 PM

*/

import java.io.IOException;

import java.util.Iterator;

import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;
/*

*

*

* @author beta13

*/

public class TPCHReducer extends MapReduceBase implements Reducer<Text,Text,Text,Text> {

public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

double avgQuantity = 0;

double avgPrice = 0;

double avgDisc = 0;

int count = 0;

while (values.hasNext()) {

StringTokenizer token = new StringTokenizer(values.next().toString());

avgQuantity += Double.parseDouble(token.nextToken());

avgPrice += Double.parseDouble(token.nextToken());

avgDisc += Double.parseDouble(token.nextToken());

count += 1;

}

avgQuantity = avgQuantity/count;

avgPrice = avgPrice/count;

avgDisc = avgDisc/count;

Text newValue = new Text(“”+avgQuantity+” “+avgPrice+” “+avgDisc+” “+count);

output.collect(key, newValue);

}

}

HadoopJob.java

/*

* HadoopJob.java

*

* Created on Oct 29, 2010, 7:59:46 PM

*/

import java.util.concurrent.Callable;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.RunningJob;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;
/*

*

*

* @author beta13

*/

public class TPCH {

public static void main(String[] args) throws Exception {

long millies = System.currentTimeMillis();

JobConf conf = new JobConf(HadoopJob.class);

conf.setJobName(“HadoopJob”);

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(Text.class);

conf.setMapperClass(HadoopMapper.class);

conf.setCombinerClass(HadoopReducer.class);

conf.setReducerClass(HadoopReducer.class);

conf.setInputFormat(TextInputFormat.class);

conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(“/home/kaza-sou/Documents/TPCH/lineitem.tbl”));

FileOutputFormat.setOutputPath(conf, new Path(“/home/kaza-sou/Documents/TPCH/OUTPUT”));

JobClient.runJob(conf);

System.out.println(“The system runs in: ” + (System.currentTimeMillis()-millies)+” ms”);

}

}

Result:

A F 38272.749223306426 0.04998324359172529 0.040000495591077205 23

N F 38284.078517238326 0.050101390955290544 0.039984057510332634 23

N O 38230.10944549784 0.049961352220490966 0.03995212254846041 23

R F 38250.801993849695 0.05001099672879702 0.03998501749531486 23

Advertisements
  1. No comments yet.
  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s

%d bloggers like this: