dht 磁力源代码共享, python 语言(不了解请参看磁力百科)

2016-08-22 09:46:35 +08:00
 3023369823

之前我在写百度网盘爬虫百度图片爬虫的时候答应网友说,抽时间要把ok 搜搜的的源码公开,如今是时候兑现诺言了,下面就是爬虫的所有代码,完全,彻底的公开,你会不会写程序都可以使用,不过请先装个 linux 系统,具备公网条件,然后运行:

python startCrawler.py

有必要提醒你,数据库字段代码中都有,请你自己建张表格,这个太简单了,就不多说了。同时我也提供一下下载地址,源码都在:下载地址 1 下载地址 2

#!/usr/bin/env python
# encoding: utf-8
"""
author:haoning
create time:2015.8.1
"""
import hashlib
import os
import time
import datetime
import traceback
import sys
import random
import json
import socket
import threading
from hashlib import sha1 #进行 hash 加密
from random import randint
from struct import unpack
from socket import inet_ntoa
from threading import Timer, Thread
from time import sleep
from collections import deque
from Queue import Queue

import MySQLdb as mdb  #数据库连接器

import metautils
import downloadTorrent
from bencode import bencode, bdecode
import pygeoip

DB_HOST = '127.0.0.1'
DB_USER = 'root'
DB_PASS = 'root'

BOOTSTRAP_NODES = (
  ("67.215.246.10", 6881),
  ("82.221.103.244", 6881),
  ("23.21.224.150", 6881)
)
RATE = 1 #调控速率
TID_LENGTH = 2
RE_JOIN_DHT_INTERVAL = 3
TOKEN_LENGTH = 2
INFO_HASH_LEN = 500000 #50w 数据很小,限制内存不至于消耗太大
CACHE_LEN = 100 #更新数据库缓存
WAIT_DOWNLOAD = 80


geoip = pygeoip.GeoIP('GeoIP.dat')

def is_ip_allowed(ip):
  country = geoip.country_code_by_addr(ip)
  if country in ('CN','TW','JP','HK', 'KR'):
      return True
  return False

def entropy(length):
  return "".join(chr(randint(0, 255)) for _ in xrange(length))

def random_id():
  h = sha1()
  h.update(entropy(20))
  return h.digest()


def decode_nodes(nodes):
  n = []
  length = len(nodes)
  if (length % 26) != 0:
      return n

  for i in range(0, length, 26):
      nid = nodes[i:i+20]
      ip = inet_ntoa(nodes[i+20:i+24])
      port = unpack("!H", nodes[i+24:i+26])[0]
      n.append((nid, ip, port))

  return n


def timer(t, f):
  Timer(t, f).start()


def get_neighbor(target, nid, end=10):
  return target[:end]+nid[end:]


class KNode(object):

  def __init__(self, nid, ip, port):
      self.nid = nid
      self.ip = ip
      self.port = port


class DHTClient(Thread):

  def __init__(self, max_node_qsize):
      Thread.__init__(self)
      self.setDaemon(True)
      self.max_node_qsize = max_node_qsize
      self.nid = random_id()
      self.nodes = deque(maxlen=max_node_qsize)

  def send_krpc(self, msg, address):
      try:
          self.ufd.sendto(bencode(msg), address)
      except Exception:
          pass

  def send_find_node(self, address, nid=None):
      nid = get_neighbor(nid, self.nid) if nid else self.nid
      tid = entropy(TID_LENGTH)
      msg = {
          "t": tid,
          "y": "q",
          "q": "find_node",
          "a": {
              "id": nid,
              "target": random_id()
          }
      }
      self.send_krpc(msg, address)

  def join_DHT(self):
      for address in BOOTSTRAP_NODES:
          self.send_find_node(address)

  def re_join_DHT(self):
      if len(self.nodes) == 0:
          self.join_DHT()
      timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)

  def auto_send_find_node(self):
      wait = 1.0 / self.max_node_qsize
      while True:
          try:
              node = self.nodes.popleft()
              self.send_find_node((node.ip, node.port), node.nid)
          except IndexError:
              pass
          try:
              sleep(wait)
          except KeyboardInterrupt:
              os._exit(0)

  def process_find_node_response(self, msg, address):
      nodes = decode_nodes(msg["r"]["nodes"])
      for node in nodes:
          (nid, ip, port) = node
          if len(nid) != 20: continue
          if ip == self.bind_ip: continue
          n = KNode(nid, ip, port)
          self.nodes.append(n)


class DHTServer(DHTClient): #获得 info_hash

  def __init__(self, master, bind_ip, bind_port, max_node_qsize):
      DHTClient.__init__(self, max_node_qsize)

      self.master = master
      self.bind_ip = bind_ip
      self.bind_port = bind_port
      self.speed=0

      self.process_request_actions = {
          "get_peers": self.on_get_peers_request,
          "announce_peer": self.on_announce_peer_request,
      }

      self.ufd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
      self.ufd.bind((self.bind_ip, self.bind_port))

      timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)


  def run(self):
      self.re_join_DHT()
      while True:
          try:
              (data, address) = self.ufd.recvfrom(65536)
              msg = bdecode(data)
              self.on_message(msg, address)
          except Exception:
              pass

  def on_message(self, msg, address):
      global RATE #设为全局量
      try:
          if msg["y"] == "r":
              if msg["r"].has_key("nodes"):
                  self.process_find_node_response(msg, address) #发现节点
          elif msg["y"] == "q":
              try:
                  self.speed+=1
                  if self.speed % 10000 ==0:
                      RATE=random.randint(1,3)
                      if RATE==2:
                          RATE=1
                      if RATE==3:
                          RATE=10
                      if self.speed>100000:
                          self.speed=0
                  if self.speed % RATE==0: #数据过多,占用 cpu 太多,划分限速,1,1,10
                      self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取 info_hash
                  #self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取 info_hash
              except KeyError:
                  self.play_dead(msg, address)
      except KeyError:
          pass

  def on_get_peers_request(self, msg, address):
      try:
          infohash = msg["a"]["info_hash"]
          tid = msg["t"]
          nid = msg["a"]["id"]
          token = infohash[:TOKEN_LENGTH]
          msg = {
              "t": tid,
              "y": "r",
              "r": {
                  "id": get_neighbor(infohash, self.nid),
                  "nodes": "",
                  "token": token
              }
          }
          self.master.log(infohash, address)
          self.send_krpc(msg, address)
      except KeyError:
          pass

  def on_announce_peer_request(self, msg, address):
      try:
          infohash = msg["a"]["info_hash"]
          token = msg["a"]["token"]
          nid = msg["a"]["id"]
          tid = msg["t"]

          if infohash[:TOKEN_LENGTH] == token:
              if msg["a"].has_key("implied_port ") and msg["a"]["implied_port "] != 0:
                  port = address[1]
              else:
                  port = msg["a"]["port"]
              self.master.log_announce(infohash, (address[0], port))
      except Exception:
          print 'error'
          pass
      finally:
          self.ok(msg, address)

  def play_dead(self, msg, address):
      try:
          tid = msg["t"]
          msg = {
              "t": tid,
              "y": "e",
              "e": [202, "Server Error"]
          }
          self.send_krpc(msg, address)
      except KeyError:
          pass

  def ok(self, msg, address):
      try:
          tid = msg["t"]
          nid = msg["a"]["id"]
          msg = {
              "t": tid,
              "y": "r",
              "r": {
                  "id": get_neighbor(nid, self.nid)
              }
          }
          self.send_krpc(msg, address)
      except KeyError:
          pass


class Master(Thread): #解析 info_hash

  def __init__(self):
      Thread.__init__(self)
      self.setDaemon(True)
      self.queue = Queue()
      self.cache = Queue()
      self.count=0
      self.mutex = threading.RLock() #可重入锁,使单线程可以再次获得已经获得的?
      self.waitDownload = Queue()
      self.metadata_queue = Queue()
      self.dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, 'oksousou', charset='utf8')
      self.dbconn.autocommit(False)
      self.dbcurr = self.dbconn.cursor()
      self.dbcurr.execute('SET NAMES utf8')
      self.visited = set()
               
  def lock(self): #加锁
      self.mutex.acquire()

  def unlock(self): #解锁
      self.mutex.release()
       
  def work(self,item):

      print "start thread",item
      while True:
          self.prepare_download_metadata()
          self.lock()
          self.download_metadata()
          self.unlock()

          self.lock()
          self.got_torrent()
          self.unlock()
                   
  def start_work(self,max):
   
      for item in xrange(max):
          t = threading.Thread(target=self.work, args=(item,))
          t.setDaemon(True)
          t.start()
       
  #入队的种子效率更高
  def log_announce(self, binhash, address=None):
      if self.queue.qsize() < INFO_HASH_LEN : #大于 INFO_HASH_LEN 就不要入队,否则后面来不及处理
          if is_ip_allowed(address[0]):
              self.queue.put([address, binhash]) #获得 info_hash
       
  def log(self, infohash, address=None):
      if self.queue.qsize() < INFO_HASH_LEN: #大于 INFO_HASH_LEN/2 就不要入队,否则后面来不及处理
          if is_ip_allowed(address[0]):
              self.queue.put([address, infohash])
   
  def prepare_download_metadata(self):
       
      if self.queue.qsize() == 0:
          sleep(2)
      #从 queue 中获得 info_hash 用来下载
      address, binhash= self.queue.get() 
      if binhash in self.visited:
          return
      if len(self.visited) > 100000: #大于 100000 重置队列,认为已经访问过了
          self.visited = set()
      self.visited.add(binhash)
      #跟新已经访问过的 info_hash
      info_hash = binhash.encode('hex')
      utcnow = datetime.datetime.utcnow()
       
      self.cache.put((address,binhash,utcnow)) #装入缓存队列
   
  def download_metadata(self):
   
      if self.cache.qsize() > CACHE_LEN/2: #出队更新下载
          while self.cache.qsize() > 0: #排空队列
              address,binhash,utcnow = self.cache.get()
              info_hash = binhash.encode('hex')
              self.dbcurr.execute('SELECT id FROM search_hash WHERE info_hash=%s', (info_hash,))
              y = self.dbcurr.fetchone()
              if y:
              # 更新最近发现时间,请求数
                  self.dbcurr.execute('UPDATE search_hash SET last_seen=%s, requests=requests+1 WHERE info_hash=%s', (utcnow, info_hash))
              else: 
                  self.waitDownload.put((address, binhash))
          self.dbconn.commit()
          if self.waitDownload.qsize() > WAIT_DOWNLOAD:
              while self.waitDownload.qsize() > 0:
                  address,binhash = self.waitDownload.get()
                  t = threading.Thread(target=downloadTorrent.download_metadata, args=(address, binhash, self.metadata_queue))
                  t.setDaemon(True)
                  t.start()

  def decode(self, s):
      if type(s) is list:
          s = ';'.join(s)
      u = s
      for x in (self.encoding, 'utf8', 'gbk', 'big5'):
          try:
              u = s.decode(x)
              return u
          except:
              pass
      return s.decode(self.encoding, 'ignore')

  def decode_utf8(self, d, i):
      if i+'.utf-8' in d:
          return d[i+'.utf-8'].decode('utf8')
      return self.decode(d[i])
   
  def parse_metadata(self, data): #解析种子
      info = {}
      self.encoding = 'utf8'
      try:
          torrent = bdecode(data) #编码后解析
          if not torrent.get('name'):
              return None
      except:
          return None
      detail = torrent
      info['name'] = self.decode_utf8(detail, 'name')
      if 'files' in detail:
          info['files'] = []
          for x in detail['files']:
              if 'path.utf-8' in x:
                  v = {'path': self.decode('/'.join(x['path.utf-8'])), 'length': x['length']}
              else:
                  v = {'path': self.decode('/'.join(x['path'])), 'length': x['length']}
              if 'filehash' in x:
                  v['filehash'] = x['filehash'].encode('hex')
              info['files'].append(v)
          info['length'] = sum([x['length'] for x in info['files']])
      else:
          info['length'] = detail['length']
      info['data_hash'] = hashlib.md5(detail['pieces']).hexdigest()
      return info

  def got_torrent(self):
      if self.metadata_queue.qsize() == 0:
          return
      binhash, address, data,start_time = self.metadata_queue.get()
      if not data:
          return
      try:
          info = self.parse_metadata(data)
          if not info:
              return
      except:
          traceback.print_exc()
          return

      temp = time.time()
      x = time.localtime(float(temp))
      utcnow = time.strftime("%Y-%m-%d %H:%M:%S",x) # get time now
       
      info_hash = binhash.encode('hex') #磁力
      info['info_hash'] = info_hash
      # need to build tags
      info['tagged'] = False
      info['classified'] = False
      info['requests'] = 1
      info['last_seen'] = utcnow
      info['create_time'] = utcnow
      info['source_ip'] = address[0]
       
      if info.get('files'):
          files = [z for z in info['files'] if not z['path'].startswith('_')]
          if not files:
              files = info['files']
      else:
          files = [{'path': info['name'], 'length': info['length']}]
      files.sort(key=lambda z:z['length'], reverse=True)
      bigfname = files[0]['path']
      info['extension'] = metautils.get_extension(bigfname).lower()
      info['category'] = metautils.get_category(info['extension'])

      try:
          try:
              print '\n', 'Saved', info['info_hash'], info['name'], (time.time()-start_time), 's', address[0]
          except:
              print '\n', 'Saved', info['info_hash']
          ret = self.dbcurr.execute('INSERT INTO search_hash(info_hash,category,data_hash,name,extension,classified,source_ip,tagged,' + 
              'length,create_time,last_seen,requests) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',
              (info['info_hash'], info['category'], info['data_hash'], info['name'], info['extension'], info['classified'],
              info['source_ip'], info['tagged'], info['length'], info['create_time'], info['last_seen'], info['requests']))
          if self.count %50 ==0:
              self.dbconn.commit()
              if self.count>100000:
                  self.count=0
      except:
          print self.name, 'save error', self.name, info
          traceback.print_exc()
          return

if __name__ == "__main__":
   
  #启动客户端
  master = Master()
  master.start_work(150)
   
  #启动服务器
  dht = DHTServer(master, "0.0.0.0", 6881, max_node_qsize=200)
  dht.start()
  dht.auto_send_find_node()

注:不好意思, V2EX 主题字数不能超过 20000 ,我没法写下去了,此贴首发博客园,我把链接贴出来,可以过去看看,不想过去把源码下下来慢慢看吧,开篇我已经给了地址,谢谢。 博客园地址:http://www.cnblogs.com/huangxie/p/5550680.html 这个爬虫还是耗费了本人和其他网上高手的很多时间的,请看到这篇博客的朋友保持钻研精神,开源精神,多多交流,秉承分享。本人建立个 qq 群作为去转盘网的官方群,人数现在也不多,如果有兴趣的话来逛逛吧,多个粉丝去转盘多一份热闹, qq 群号: 512245829

4841 次点击
所在节点    Python
6 条回复
snsd
2016-08-22 10:24:12 +08:00
楼主方便的话能不能做个爱奇艺 1080p 视频的在线解析?
3023369823
2016-08-22 10:44:02 +08:00
@snsd 这个没有研究过,你有好的思路可以说下
Allianzcortex
2016-08-22 11:18:01 +08:00
我最近也在看 DHT 协议,准备实时监听。不过 LZ 的代码里可以优化一下 global 全局变量和裸 except 这块( no offense 哈)。
3023369823
2016-08-22 11:21:40 +08:00
@Allianzcortex 是的,代码写了好久了,有时间在看看,谢谢你的指出
hmlbr
2016-08-22 17:34:07 +08:00
有空研究下,赞 lz
3023369823
2016-08-22 21:41:14 +08:00
@hmlbr 好的

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/300870

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX