2009/08/09 16:36
package RecommendationSystem.Crawler;

import java.io.IOException;
import java.net.URL;
import org.jdom.*;
import org.jdom.input.SAXBuilder;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;




public class Crawler {

    static String searchKeyword;
    static String searchValue;
    static String TestPath;
   
   
    public static class Map extends MapReduceBase implements Mapper <LongWritable, Text, Text, Text>
    {
        @SuppressWarnings("unchecked")
        @Override
        public void map(LongWritable key, Text value,
                OutputCollector<Text, Text> output, Reporter report)
                throws IOException {

            Text titleWord = new Text();
            Crawler.searchKeyword = "publisher";
            Crawler.searchValue = value.toString();
            Namespace NS = Namespace.getNamespace("" , "http://webservices.amazon.com/AWSECommerceService/2005-10-05");
          
            int totalPage = 0;
            //==================================================================================
            //        #1. 해당 출판사(value)의 책이 전체적으로 존재하는지 check.
            //==================================================================================
                try {                  
                    Document doc = readDocument( );
                    Element itemSearchResponseElement = doc.getRootElement();
                    Element itemsElement = itemSearchResponseElement.getChild("Items", NS);
                    Element totalPageElement = itemsElement.getChild("TotalPages", NS);
                 
                    totalPage = Integer.parseInt( totalPageElement.getValue() );
                
                    if(totalPage == 0)
                        return;
                }
                catch (IOException e)
                {
                    e.printStackTrace();
                    return;
                }
              
                //====================================================================================
              
                if(totalPage <=400)
                {           
                    //==================================================================================
                    //        #2-1. 해당출판사(value)의 존재하는 책을 find & OutputCollector(해당출판사, title&author).
                    //==================================================================================
                    for (int page = 1; page <= totalPage; page++)
                    {
                        SAXBuilder builder= new SAXBuilder();
                        String urlString = "http://ecs.amazonaws.com/onca/xml?Service=AWSECommerceService&AWSAccessKeyId=AKIAIXRSXD4KYC2BIIIQ&Operation=ItemSearch&SearchIndex=Books&Power="
                            + Crawler.searchKeyword
                            + ":"
                            + Crawler.searchValue
                            + "&ItemPage="
                            + page ;
                      
                        Document doc = new Document();
                        try {
                           do{
                               doc = builder.build(new URL(urlString));
                                Thread.currentThread();
                                Thread.sleep(10);
                            }while(doc == null);
                       
                        } catch (JDOMException e1) {
                            e1.printStackTrace();
                            continue;
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                            continue;
                        }
                        Element rootElement = doc.getRootElement();
                        Element itemsElement = rootElement.getChild("Items", NS);
                        List<Element> itemElementList = itemsElement.getChildren("Item", NS);
                         for( int c = 0 ; c < itemElementList.size() ; c ++ )
                          {
                              try
                              {
                                  Element itemArrtibureElement = (itemElementList.get(c)).getChild("ItemAttributes", NS);
                                  Element title = itemArrtibureElement.getChild("Title", NS);
                                  List<Element> authorList = itemArrtibureElement.getChildren("Author", NS);
                                 
                                  String authorString = new String();
                                  for(int d = 0 ; d < authorList.size() ; d ++)
                                  {
                                      authorString += "*";
                                      authorString += authorList.get(d).getValue();
                                  }
                                  titleWord.set (title.getValue());
                                  Text authorWord = new Text();
                                  authorWord.set(authorString);
                                  output.collect( titleWord , authorWord );
                                  //System.out.println( titleWord.toString() + "    " + authorWord.toString() );
                              }
                              catch (Exception e)
                              {
                                  continue;
                              }
                          }
                      
                    }
                   
                }
              
                else
                {                  
                    //==================================================================================
                    //        #2-2. tatalpage가 400이 넘은 출판사는 Crawler2에서 처리하기위해 "#"로 표시.
                    //==================================================================================
                    Text starWord = new Text();
                    starWord.set("#");   
                    output.collect(starWord , value );
                    System.out.println(value.toString() );
                   
                }  
        }
    }
  
   
    public static class Reduce extends MapReduceBase implements Reducer <Text, Text, Text, Text>
    {
        @Override
        public void reduce(Text key, Iterator<Text> value,
                OutputCollector<Text, Text> output, Reporter report)
                throws IOException {
           
            //Crawler2 Dir 에 저장.           
            if( key.toString()  == "#" )
            {
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(conf);
                Path crawler2Path;
                FSDataOutputStream out = null;
                int i=0;
               
                while (value.hasNext())
                {
                    do{
                        crawler2Path = new Path( "/Crawler2" + "/" + i++  + ".txt" );
                    }while( fs.exists(crawler2Path) );
                    out = fs.create( crawler2Path );
                    out.writeChars( value.next().toString() + "\n" );
                    out.close();
                }                
            }
           
           
            //====================================================================================
            else
            {
                Text word = new Text();
                word .set( value.next().toString() );
                output.collect( key, word );
            }
      
        }
    }


   
    public static boolean isKeyword(String s) {
        if ( s == "title"  )
            return true;

        if ( s == "keywords"  )
            return true;

        if ( s == "author" )
            return true;

        if ( s == "publisher" )
            return true;

        return false;
    }
  

    public static Document readDocument()
            throws IOException {
        try {
            SAXBuilder builder = new SAXBuilder();

            if (!isKeyword(Crawler.searchKeyword)) {
                System.out.println("wrong keyword1");
            }

            String urlString = "http://ecs.amazonaws.com/onca/xml?Service=AWSECommerceService&AWSAccessKeyId=AKIAIXRSXD4KYC2BIIIQ&Operation=ItemSearch&SearchIndex=Books&Power=";
            Document doc = builder.build(new URL(urlString + Crawler.searchKeyword
                    + ":" + Crawler.searchValue));

            return doc;

        } catch (JDOMException e) {
            e.printStackTrace();
        } catch (NullPointerException e) {
            e.printStackTrace();
        }

        return null;
    }

  
        public static void main(String[] args) throws IOException {
          
            if (args.length != 3)
            {
                System.out.println("Crawler [SearchList Dir] [Complete Dir] [Crawler2 Dir]");
                return ;
            }
   
            JobConf conf = new JobConf( Crawler.class );
            conf.setJobName( "crawler" );
           
            JobConf job = new JobConf(conf);
            FileSystem fs = FileSystem.get(job);
            fs.delete(new Path(args[1]), true);
           
            //OutputCollector
             conf.setOutputKeyClass(Text.class);
             conf.setOutputValueClass(Text.class);

             conf.setMapperClass(Map.class);
             conf.setReducerClass(Reduce.class);

             conf.setInputFormat(TextInputFormat.class);
             conf.setOutputFormat(TextOutputFormat.class);

             FileInputFormat.setInputPaths(conf, new Path(args[0]));
             FileOutputFormat.setOutputPath(conf, new Path(args[1]));
           
            
            // conf.set("TestPath", args[2]);
            
            JobClient.runJob(conf);
    }
      
}


크리에이티브 커먼즈 라이선스
Creative Commons License
Posted by hannami