当前位置:首页 > PHP教程 > php高级应用 > 列表

用PHP和Shell写Hadoop的MapReduce程序

发布:smiling 来源: PHP粉丝网  添加日期:2020-11-15 19:53:51 浏览: 评论:0 

Hadoop本身是Java写的,所以,给hadoop写mapreduce,人们会自然地想到Java。但Hadoop里面有个contrib叫做hadoop streaming,这是一个小工具,为hadoop提供streaming支持。

使得任何支持标准IO (stdin, stdout)的可执行程序都能成为hadoop的mapper或者 reducer。例如:

hadoop jar hadoop-streaming.jar -input SOME_INPUT_DIR_OR_FILE -output SOME_OUTPUT_DIR -mapper /bin/cat -reducer /usr/bin/wc

在这个例子里,就使用了Unix/Linux自带的cat和wc工具来作为mapper / reducer,是不是很神奇?

如果你习惯了使用一些动态语言,用动态语言来写mapreduce吧,跟之前的编程没有任何不同,hadoop只是运行它的一个框架,下面我演示一下用PHP来实现Word Counter的mapreduce。

一、找到Streaming jar

Hadoop根目录下是没有hadoop-streaming.jar的,因为streaming是一个contrib,所以要去contrib下面找,以hadoop-0.20.2为例,它在这里:

$HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar

二、写Mapper

新建一个wc_mapper.php,写入如下代码:

  1. #!/usr/bin/php 
  2. <?php 
  3. $in = fopen(“php://stdin”, “r”); 
  4. $results = array(); 
  5. while ( $line = fgets($in, 4096) ) 
  6. $words = preg_split(‘/\W/', $line, 0, PREG_SPLIT_NO_EMPTY); 
  7. foreach ($words as $word
  8. $results[] = $word
  9. fclose($in); 
  10. foreach ($results as $key => $value
  11. print “$value\t1\n”; 

这段代码的大致意思是:把输入的每行文本中的单词找出来,并以”

hello 1

world 1″

这样的形式输出出来。

和之前写的PHP基本没有什么不同,对吧,可能稍微让你感到陌生有两个地方:

PHP作为可执行程序

第一行的“#!/usr/bin/php”告诉linux,要用/usr/bin/php这个程序作为以下代码的解释器。写过linux shell的人应该很熟悉这种写法了,每个shell脚本的第一行都是这样: #!/bin/bash, #!/usr/bin/python

有了这一行,保存好这个文件以后,就可以像这样直接把wc_mapper.php当作cat, grep一样的命令执行了:./wc_mapper.php

使用stdin接收输入

PHP支持多种参数传入的方法,大家最熟悉的应该是从$_GET, $_POST超全局变量里面取通过Web传递的参数,次之是从$_SERVER['argv']里取通过命令行传入的参数,这里,采用的是标准输入stdin

它的使用效果是:

在linux控制台输入 ./wc_mapper.php

wc_mapper.php运行,控制台进入等候用户键盘输入状态

用户通过键盘输入文本

用户按下Ctrl + D终止输入,wc_mapper.php开始执行真正的业务逻辑,并将执行结果输出

那么stdout在哪呢?print本身已经就是stdout啦,跟我们以前写web程序和CLI脚本没有任何不同。

三、写Reducer

新建一个wc_reducer.php,写入如下代码:

  1. #!/usr/bin/php 
  2. <?php 
  3. $in = fopen(“php://stdin”, “r”); 
  4. $results = array(); 
  5. while ( $line = fgets($in, 4096) ) 
  6. list($key$value) = preg_split(“/\t/”, trim($line), 2); 
  7. $results[$key] += $value
  8. fclose($in); 
  9. ksort($results); 
  10. foreach ($results as $key => $value
  11. print “$key\t$value\n”; 

这段代码的大意是统计每个单词出现了多少次,并以”

hello 2

world 1″

这样的形式输出。

四、用Hadoop来运行

上传要统计的示例文本,代码如下:

hadoop fs -put *.TXT /tmp/input

以Streaming方式执行PHP mapreduce程序,代码如下:

hadoop jar hadoop-0.20.2-streaming.jar -input /tmp/input -output /tmp/output -mapper wc_mapper.php的绝对路径 -reducer wc_reducer.php的绝对路径

注意:

input和output目录是在hdfs上的路径

mapper和reducer是在本地机器的路径,一定要写绝对路径,不要写相对路径,以免到时候hadoop报错说找不到mapreduce程序。

查看结果,代码如下:

hadoop fs -cat /tmp/output/part-00000

五、shell版的Hadoop MapReduce程序,代码如下:

  1. #!/bin/bash - 
  2.  
  3. # 加载配置文件 
  4. source './config.sh' 
  5.  
  6. # 处理命令行参数 
  7. while getopts "d:" arg 
  8. do 
  9.  case $arg in 
  10.   d) 
  11.    date=$OPTARG 
  12.  
  13.   ?) 
  14.             echo "unkonw argument" 
  15.    exit 1 
  16.  
  17.     esac 
  18. done 
  19.  
  20. # 默认处理日期为昨天 
  21. default_date=`date -v-1d +%Y-%m-%d` 
  22.  
  23. # 最终处理日期. 如果日期格式不对, 则退出执行 
  24. date=${date:-${default_date}} 
  25. if ! [[ "$date" =~ [12][0-9]{3}-(0[1-9]|1[12])-(0[1-9]|[12][0-9]|3[01]) ]] 
  26. then 
  27.  echo "invalid date(yyyy-mm-dd): $date" 
  28.  exit 1 
  29. fi 
  30.  
  31. # 待处理文件 
  32. log_files=$(${hadoop_home}bin/hadoop fs -ls ${log_file_dir_in_hdfs} | awk '{print $8}' | grep $date
  33.  
  34. # 如果待处理文件数目为零, 则退出执行 
  35. log_files_amount=$(($(echo $log_files | wc -l) + 0)) 
  36. if [ $log_files_amount -lt 1 ] 
  37. then 
  38.  echo "no log files found" 
  39.  exit 0 
  40. fi 
  41.  
  42. # 输入文件列表 
  43. for f in $log_files 
  44. do 
  45.  input_files_list="${input_files_list} $f" 
  46. done 
  47.  
  48. function map_reduce () { 
  49.  if ${hadoop_home}bin/hadoop jar ${streaming_jar_path} -input${input_files_list} -output ${mapreduce_output_dir}${date}/${1}/ -mapper "${mapper} ${1}" -reducer "${reducer}" -file "${mapper}" 
  50.  then 
  51.   echo "streaming job done!" 
  52.  else 
  53.   exit 1 
  54.  fi 
  55.  
  56. # 循环处理每一个bucket 
  57. for bucket in ${bucket_list[@]} 
  58. do 
  59.  map_reduce $bucket 
  60. done 

Tags: Shell Hadoop MapReduce

分享到: