04.Mapreduce实例——单表join

database

04.Mapreduce实例——单表join

实验原理

以本实验的buyer1(buyer_id,friends_id)表为例来阐述单表连接的实验原理。单表连接,连接的是左表的buyer_id列和右表的friends_id列,且左表和右表是同一个表。因此,在map阶段将读入数据分割成buyer_id和friends_id之后,会将buyer_id设置成key,friends_id设置成value,直接输出并将其作为左表;再将同一对buyer_id和friends_id中的friends_id设置成key,buyer_id设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个key的value-list就包含了"buyer_idfriends_id--friends_idbuyer_id"关系。取出每个key的value-list进行解析,将左表中的buyer_id放入一个数组,右表中的friends_id放入一个数组,然后对两个数组求笛卡尔积就是最后的结果了。

实验步骤

  1. 开启Hadoop服务

         Start-all.sh

  1. 建立目录

            mkdir -p /data/mapreduce7

  1. 将buyer1文件上传到该目录下
  2. 上传hadoop2lib文件并解压

         unzip hadoop2lib.zip

  1. 在hdfs上新建/mymapreduce7/in目录,然后将Linux本地/data/mapreduce7目录下的buyer1文件导入到hdfs的/mymapreduce7/in目录中。

         hadoop fs -mkdir -p /mymapreduce7/in 

         hadoop fs -put /data/mapreduce7/buyer1 /mymapreduce7/in

  1. IDEA中编写Java代码
  2. package mapreduce4;

    import java.io.IOException;

    import java.util.Iterator;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class DanJoin {

        public static class Map extends Mapper<Object,Text,Text,Text>{

            public void map(Object key,Text value,Context context)

                    throws IOException,InterruptedException{

                String line =

    value.toString();

                String[] arr = line.split(",");

                String mapkey=arr[0];

                String mapvalue=arr[1];

                String relationtype=new String();

                relationtype="1";

                context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue));

                //System.out.println(relationtype+"+"+mapvalue);

                relationtype="2";

                context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey));

                //System.out.println(relationtype+"+"+mapvalue);

            }

        }

        public static class Reduce extends Reducer<Text, Text, Text, Text>{

            public void reduce(Text key,Iterable<Text> values,Context context)

                    throws IOException,InterruptedException{

                int buyernum=0;

                String[] buyer=new String[20];

                int friendsnum=0;

                String[] friends=new String[20];

                Iterator ite=values.iterator();

                while(ite.hasNext()){

                    String

    record=ite.next().toString();

                    int len=record.length();

                    int i=2;

                    if(0==len){

                        continue;

                    }

                    char relationtype=record.charAt(0);

                    if("1"==relationtype){

                        buyer

    [buyernum]=record.substring(i);

                        buyernum++;

                    }

                    if("2"==relationtype){

                       

    friends[friendsnum]=record.substring(i);

                        friendsnum++;

                    }

                }

                if(0!=buyernum&&0!=friendsnum){

                    for(int m=0;m<buyernum;m++){

                        for(int n=0;n<friendsnum;n++){

                            if(buyer[m]!=friends[n]){

                               

    context.write(new Text(buyer[m]),new Text(friends[n]));

                            }

                        }

                    }

                }

            }

        }

        public static void main(String[] args) throws Exception{


            Configuration conf=new Configuration();

            String[] otherArgs=new String[2];

            otherArgs[0]="hdfs://192.168.149.10:9000/mymapreduce7/in/buyer1";

            otherArgs[1]="hdfs://192.168.149.10:9000/mymapreduce7/out";

            Job job=new Job(conf," Table join");

            job.setJarByClass(DanJoin.class);

            job.setMapperClass(Map.class);

            job.setReducerClass(Reduce.class);

            job.setOutputKeyClass(Text.class);

            job.setOutputValueClass(Text.class);

            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

            System.exit(job.waitForCompletion(true)?0:1);


        }

    }

 

  1. 将hadoop2lib目录中的jar包,拷贝到hadoop2lib目录下。
  2. 拷贝log4j.properties文件
  3. 运行结果

 

 

 

 

 

 

以上是 04.Mapreduce实例——单表join 的全部内容, 来源链接: utcz.com/z/536090.html

回到顶部