当前位置:首页 > PHP教程 > php高级应用 > 列表

php与python 线程池多线程爬虫的例子

发布:smiling 来源: PHP粉丝网  添加日期:2018-10-31 22:56:37 浏览: 评论:0 

php例子:

  1. <?php 
  2.  
  3. class Connect extends Worker  //worker模式 
  4.  
  5. public function __construct() 
  6.  
  7.  
  8. public function getConnection() 
  9. if (!self::$ch
  10. self::$ch = curl_init(); 
  11. curl_setopt(self::$ch, CURLOPT_TIMEOUT, 2); 
  12. curl_setopt(self::$ch, CURLOPT_RETURNTRANSFER, 1); 
  13. curl_setopt(self::$ch, CURLOPT_HEADER, 0); 
  14. curl_setopt(self::$ch, CURLOPT_NOSIGNAL, true); 
  15. curl_setopt(self::$ch, CURLOPT_USERAGENT, "Firefox"); 
  16. curl_setopt(self::$ch, CURLOPT_FOLLOWLOCATION, 1); 
  17.  
  18. /* do some exception/error stuff here maybe */ 
  19.  
  20. return self::$ch
  21.  
  22. public function closeConnection() 
  23. curl_close(self::$ch); 
  24.  
  25. /** 
  26. * Note that the link is stored statically, which for pthreads, means thread local 
  27. * */ 
  28. protected static $ch
  29.  
  30.  
  31. class Query extends Threaded 
  32.  
  33. public function __construct($url
  34. $this->url = $url
  35.  
  36. public function run() 
  37. $ch = $this->worker->getConnection(); 
  38. curl_setopt($ch, CURLOPT_URL, $this->url); 
  39. $page = curl_exec($ch); 
  40. $info = curl_getinfo($ch); 
  41. $error = curl_error($ch); 
  42. $this->deal_data($this->url, $page$info$error); 
  43.  
  44. $this->result = $page
  45.  
  46. function deal_data($url$page$info$error
  47. $parts = explode("."$url); 
  48.  
  49. $id = $parts[1]; 
  50. if ($info['http_code'] != 200) 
  51. $this->show_msg($id$error); 
  52. else 
  53. $this->show_msg($id"OK"); 
  54.  
  55. function show_msg($id$msg
  56. echo $id."\t$msg\n"
  57.  
  58. public function getResult() 
  59. return $this->result; 
  60.  
  61. protected $url
  62. protected $result
  63.  
  64.  
  65. function check_urls_multi_pthreads() 
  66. global $check_urls;  //定义抓取的连接 
  67. $check_urls = array'http://xxx.com' => "xx网",); 
  68. $pool = new Pool(10, "Connect"array()); //建立10个线程池 
  69. foreach ($check_urls as $url => $name
  70. $pool->submit(new Query($url)); 
  71. $pool->shutdown(); 
  72.  
  73. check_urls_multi_pthreads(); 

python 多线程

  1. def handle(sid)://这个方法内执行爬虫数据处理 
  2.  
  3. pass 
  4. class MyThread(Thread): 
  5. """docstring for ClassName""" 
  6. def __init__(self, sid): 
  7. Thread.__init__(self) 
  8. self.sid = sid 
  9.  
  10. def run(): 
  11. handle(self.sid) 
  12.  
  13. threads = [] 
  14. for i in xrange(1,11): 
  15. t = MyThread(i) 
  16. threads.append(t) 
  17. t.start() 
  18.  
  19. for t in threads: 
  20. t.join() 

python 线程池爬虫

  1. from queue import Queue  
  2. from threading import Thread, Lock 
  3. import urllib.parse 
  4. import socket 
  5. import re 
  6. import time 
  7.  
  8. seen_urls = set(['/']) 
  9. lock = Lock() 
  10.  
  11.  
  12. class Fetcher(Thread): 
  13.     def __init__(self, tasks): 
  14.         Thread.__init__(self) 
  15.         self.tasks = tasks 
  16.         self.daemon = True 
  17.  
  18.         self.start() 
  19.  
  20.     def run(self): 
  21.         while True: 
  22.             url = self.tasks.get() 
  23.             print(url) 
  24.             sock = socket.socket() 
  25.             sock.connect(('localhost', 3000)) 
  26.             get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) 
  27.             sock.send(get.encode('ascii')) 
  28.             response = b'' 
  29.             chunk = sock.recv(4096) 
  30.             while chunk: 
  31.                 response += chunk 
  32.                 chunk = sock.recv(4096) 
  33.  
  34.             links = self.parse_links(url, response) 
  35.  
  36.             lock.acquire() 
  37.             for link in links.difference(seen_urls): 
  38.                 self.tasks.put(link) 
  39.             seen_urls.update(links)     
  40.             lock.release() 
  41.  
  42.             self.tasks.task_done() 
  43.  
  44.     def parse_links(self, fetched_url, response): 
  45.         if not response: 
  46.             print('error: {}'.format(fetched_url)) 
  47.             return set() 
  48.         if not self._is_html(response): 
  49.             return set() 
  50.         urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)'''
  51.                               self.body(response))) 
  52.  
  53.         links = set() 
  54.         for url in urls: 
  55.             normalized = urllib.parse.urljoin(fetched_url, url) 
  56.             parts = urllib.parse.urlparse(normalized) 
  57.             if parts.scheme not in ('''http''https'): 
  58.                 continue 
  59.             host, port = urllib.parse.splitport(parts.netloc) 
  60.             if host and host.lower() not in ('localhost'): 
  61.                 continue 
  62.             defragmented, frag = urllib.parse.urldefrag(parts.path) 
  63.             links.add(defragmented) 
  64.  
  65.         return links 
  66.  
  67.     def body(self, response): 
  68.         body = response.split(b'\r\n\r\n', 1)[1] 
  69.         return body.decode('utf-8'
  70.  
  71.     def _is_html(self, response): 
  72.         head, body = response.split(b'\r\n\r\n', 1) 
  73.         headers = dict(h.split(': 'for h in head.decode().split('\r\n')[1:]) 
  74.         return headers.get('Content-Type''').startswith('text/html'
  75.  
  76.  
  77. class ThreadPool: 
  78.     def __init__(self, num_threads): 
  79.         self.tasks = Queue() 
  80.         for _ in range(num_threads): 
  81.             Fetcher(self.tasks) 
  82.  
  83.     def add_task(self, url): 
  84.         self.tasks.put(url) 
  85.  
  86.     def wait_completion(self): 
  87.         self.tasks.join() 
  88.  
  89. if __name__ == '__main__'
  90.     start = time.time() 
  91.     pool = ThreadPool(4) 
  92.     pool.add_task("/"
  93.     pool.wait_completion() 
  94.     print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start)) 

Tags: python php线程池

分享到: