基于 python 的百度云网盘资源搜索引擎设计架构

2016-05-09 16:54:44 +08:00
 dajiji

大家都知道百度云网盘上有很多分享的资源,包括软件、各类视频自学教程、电子书、甚至各种电影、 BT 种子应有尽有,但百度云却没有提供相应的搜索功能。个人平时要找一些软件、美剧觉得非常蛋疼。于是就尝试开发一个百度云资源的搜索系统。 资源爬虫思路: 搜索引擎么最重要的就是有海量的资源了,有了资源,只要再基于资源实现全文检索功能就是一个简单的搜索引擎了。首先我需要爬取百度云的分享资源,爬取思路,打开任意一个百度云分享者的主页 yun.baidu.com/share/home?uk=xxxxxx&view=share#category/type=0,你可以发现分享者有订阅者和粉丝,你可以递归遍历订阅者和粉丝,从而获得大量分享者 uk ,进而获得大量的分享资源。 系统实现环境: 语言: python 操作系统: Linux 其他中间件: nginx mysql sphinx 系统包括几个独立的部分: 1 、基于 requests 实现的独立资源爬虫 2 、基于开源全文检索引擎 sphinx 实现的资源索引程序 3 、基于 Django+bootstrap3 开发的简易网站,网站搭建采用 nginx1.8+fastCGI(flup)+python 。 演示网站 http://www.itjujiao.com PS: 目前爬虫爬取了 4000W 左右的数据, sphinx 对内存的要求实在太大了,巨坑。 百度会对爬虫做 ip 限制,写了个简单的 xicidaili 代理采集程序, requests 可以配置 http 代理。 分词是 sphinx 自带的实现,支持中文分词,中文基于一元分词,有点过度分词,分词效果不是特别理想,比如我搜关键词“叶问 3 ”出现的结果中会有“叶子的问题第 3 版”,不符合预期。英文分词有很多可以改善的地方,比如我搜 xart 不会出现 x-art 的结果,而实际上 x-art 却也是我想要的结果集(你们懂的)。 数据库是 mysql ,资源表,考虑单表记录上限,分了 10 个表。第一次爬完 sphinx 做全量索引,后续做增量索引。 后续优化: 1 、分词处理,目前分词搜索结果不是很理想,有大神可以指点下思路 2 、数据去重,目前发现抓取的数据很多是共享资源,后续考虑基于 MD5 去重

爬虫部分实现代码(只是思路代码有点乱): #coding: utf8

import re import urllib2 import time from Queue import Queue import threading, errno, datetime import json import requests import MySQLdb as mdb

DB_HOST = '127.0.0.1' DB_USER = 'root' DB_PASS = ''

re_start = re.compile(r'start=(\d+)') re_uid = re.compile(r'query_uk=(\d+)') re_pptt = re.compile(r'&pptt=(\d+)') re_urlid = re.compile(r'&urlid=(\d+)')

ONEPAGE = 20 ONESHAREPAGE = 20

URL_SHARE = 'http://yun.baidu.com/pcloud/feed/getsharelist?auth_type=1&start={start}&limit=20&query_uk={uk}&urlid={id}' URL_FOLLOW = 'http://yun.baidu.com/pcloud/friend/getfollowlist?query_uk={uk}&limit=20&start={start}&urlid={id}' URL_FANS = 'http://yun.baidu.com/pcloud/friend/getfanslist?query_uk={uk}&limit=20&start={start}&urlid={id}'

QNUM = 1000 hc_q = Queue(20) hc_r = Queue(QNUM)

success = 0 failed = 0

PROXY_LIST = [[0, 10, "42.121.33.160", 809, "", "", 0], [5, 0, "218.97.195.38", 81, "", "", 0], ]

def req_worker(inx): s = requests.Session() while True: req_item = hc_q.get()

    req_type = req_item[0]
    url = req_item[1]
    r = s.get(url)
    hc_r.put((r.text, url))
    print "req_worker#", inx, url

def response_worker(): dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, 'baiduyun', charset='utf8') dbcurr = dbconn.cursor() dbcurr.execute('SET NAMES utf8') dbcurr.execute('set global wait_timeout=60000') while True:

    metadata, effective_url = hc_r.get()
    #print "response_worker:", effective_url
    try:
        tnow = int(time.time())
        id = re_urlid.findall(effective_url)[0]
        start = re_start.findall(effective_url)[0]
        if True:
            if 'getfollowlist' in effective_url: #type = 1
                follows = json.loads(metadata)
                uid = re_uid.findall(effective_url)[0]
                if "total_count" in follows.keys() and follows["total_count"]>0 and str(start) == "0":
                    for i in range((follows["total_count"]-1)/ONEPAGE):
                        try:
                            dbcurr.execute('INSERT INTO urlids(uk, start, limited, type, status) VALUES(%s, %s, %s, 1, 0)' % (uid, str(ONEPAGE*(i+1)), str(ONEPAGE)))
                        except Exception as ex:
                            print "E1", str(ex)
                            pass
                
                if "follow_list" in follows.keys():
                    for item in follows["follow_list"]:
                        try:
                            dbcurr.execute('INSERT INTO user(userid, username, files, status, downloaded, lastaccess) VALUES(%s, "%s", 0, 0, 0, %s)' % (item['follow_uk'], item['follow_uname'], str(tnow)))
                        except Exception as ex:
                            print "E13", str(ex)
                            pass
                else:
                    print "delete 1", uid, start
                    dbcurr.execute('delete from urlids where uk=%s and type=1 and start>%s' % (uid, start))
            elif 'getfanslist' in effective_url: #type = 2
                fans = json.loads(metadata)
                uid = re_uid.findall(effective_url)[0]
                if "total_count" in fans.keys() and fans["total_count"]>0 and str(start) == "0":
                    for i in range((fans["total_count"]-1)/ONEPAGE):
                        try:
                            dbcurr.execute('INSERT INTO urlids(uk, start, limited, type, status) VALUES(%s, %s, %s, 2, 0)' % (uid, str(ONEPAGE*(i+1)), str(ONEPAGE)))
                        except Exception as ex:
                            print "E2", str(ex)
                            pass
                
                if "fans_list" in fans.keys():
                    for item in fans["fans_list"]:
                        try:
                            dbcurr.execute('INSERT INTO user(userid, username, files, status, downloaded, lastaccess) VALUES(%s, "%s", 0, 0, 0, %s)' % (item['fans_uk'], item['fans_uname'], str(tnow)))
                        except Exception as ex:
                            print "E23", str(ex)
                            pass
                else:
                    print "delete 2", uid, start
                    dbcurr.execute('delete from urlids where uk=%s and type=2 and start>%s' % (uid, start))
            else:
                shares = json.loads(metadata)
                uid = re_uid.findall(effective_url)[0]
                if "total_count" in shares.keys() and shares["total_count"]>0 and str(start) == "0":
                    for i in range((shares["total_count"]-1)/ONESHAREPAGE):
                        try:
                            dbcurr.execute('INSERT INTO urlids(uk, start, limited, type, status) VALUES(%s, %s, %s, 0, 0)' % (uid, str(ONESHAREPAGE*(i+1)), str(ONESHAREPAGE)))
                        except Exception as ex:
                            print "E3", str(ex)
                            pass
                if "records" in shares.keys():
                    for item in shares["records"]:
                        try:
                            dbcurr.execute('INSERT INTO share(userid, filename, shareid, status) VALUES(%s, "%s", %s, 0)' % (uid, item['title'], item['shareid']))
                        except Exception as ex:
                            #print "E33", str(ex), item
                            pass
                else:
                    print "delete 0", uid, start
                    dbcurr.execute('delete from urlids where uk=%s and type=0 and start>%s' % (uid, str(start)))
            dbcurr.execute('delete from urlids where id=%s' % (id, ))
            dbconn.commit()
    except Exception as ex:
        print "E5", str(ex), id

    
    pid = re_pptt.findall(effective_url)
    
    if pid:
        print "pid>>>", pid
        ppid = int(pid[0])
        PROXY_LIST[ppid][6] -= 1
dbcurr.close()
dbconn.close()

def worker(): global success, failed dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, 'baiduyun', charset='utf8') dbcurr = dbconn.cursor() dbcurr.execute('SET NAMES utf8') dbcurr.execute('set global wait_timeout=60000') while True:

    #dbcurr.execute('select * from urlids where status=0 order by type limit 1')
    dbcurr.execute('select * from urlids where status=0 and type>0 limit 1')
    d = dbcurr.fetchall()
    #print d
    if d:
        id = d[0][0]
        uk = d[0][1]
        start = d[0][2]
        limit = d[0][3]
        type = d[0][4]
        dbcurr.execute('update urlids set status=1 where id=%s' % (str(id),))
        url = ""
        if type == 0:
            url = URL_SHARE.format(uk=uk, start=start, id=id).encode('utf-8')
        elif  type == 1:
            url = URL_FOLLOW.format(uk=uk, start=start, id=id).encode('utf-8')
        elif type == 2:
            url = URL_FANS.format(uk=uk, start=start, id=id).encode('utf-8')
        if url:
            hc_q.put((type, url))
            
        #print "processed", url
    else:
        dbcurr.execute('select * from user where status=0 limit 1000')
        d = dbcurr.fetchall()
        if d:
            for item in d:
                try:
                    dbcurr.execute('insert into urlids(uk, start, limited, type, status) values("%s", 0, %s, 0, 0)' % (item[1], str(ONESHAREPAGE)))
                    dbcurr.execute('insert into urlids(uk, start, limited, type, status) values("%s", 0, %s, 1, 0)' % (item[1], str(ONEPAGE)))
                    dbcurr.execute('insert into urlids(uk, start, limited, type, status) values("%s", 0, %s, 2, 0)' % (item[1], str(ONEPAGE)))
                    dbcurr.execute('update user set status=1 where userid=%s' % (item[1],))
                except Exception as ex:
                    print "E6", str(ex)
        else:
            time.sleep(1)
            
    dbconn.commit()
dbcurr.close()
dbconn.close()

for item in range(16):
t = threading.Thread(target = req_worker, args = (item,)) t.setDaemon(True) t.start()

s = threading.Thread(target = worker, args = ()) s.setDaemon(True) s.start()

response_worker()

8699 次点击
所在节点    Python
13 条回复
SpicyCat
2016-05-09 17:17:11 +08:00
太长没看完,这个和用 google 的 site:pan.baidu.com 有什么区别?
songkaiape
2016-05-09 17:42:57 +08:00
感觉挺不错的,现在网上不少类似网站但是感觉都不是很好使
lxy
2016-05-09 17:45:50 +08:00
gamecreating
2016-05-09 21:21:18 +08:00
之前写过一个.....采集几千万上亿条数据....然并卵 现在压缩包还躺在百度网盘里
exuxu
2016-05-09 21:45:42 +08:00
@lxy 不错,不过背景图太蛋疼了,这网站能坚持多久?
Marfal
2016-05-09 21:57:20 +08:00
然而速度太慢下不了(误
Fing
2016-05-09 22:26:26 +08:00
如果只是为了获取百度网盘资源意义真的不大,因为 google 也会去爬百度云,其次,如果你那 4000 数据中被用户取消分享,所以你还是需要反复去爬, http://baiduyun.ren/,这是我搞的一个百度网盘搜索,直接拿的 Google 爬的结果,你可以对比下你爬的数据是否有被覆盖 @-@
uglyer
2016-05-09 22:30:36 +08:00
🌚看着怎么像毕业设计呀。
jeremaihloo
2016-05-09 22:31:34 +08:00
这帖子的内容,强迫症受不了
YUX
2016-05-09 22:35:51 +08:00
在线播放给好评
dajiji
2016-05-10 08:48:31 +08:00
晕啊。不知怎么排版。我自己都有强迫症
huayu
2016-05-10 18:08:20 +08:00
pancts
2016-05-19 09:23:56 +08:00
@exuxu 233 panc 作者就是我 已经坚持两年了 最长闭站时间不超过一天

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/277380

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX