有个需求场景是这样的,使用redis控制scrapy运行的数量。设置系统后台为4后,scrapy最多只能启动4个任务,多余的任务将被排队等待。
概况
最近做了一个django + scrapy + celery + redis 的爬虫系统,客户购买的主机除了跑其他程序外,还要跑我开发的这套程序,所以需要手动控制scrapy的实例数量,避免过多的爬虫给系统造成负担。
流程设计
1、爬虫任务由用户以请求的方式发起,所有的用户的请求统一进入到celery进行排队;
2、任务数量控制的执行就交给reids,经由celery保存到redis,包含了爬虫启动所需要的必要信息,从redis取一条信息即可启动一个爬虫;
3、通过scrapyd的接口来获取当前在运行的爬虫数量,以便决定下一步流程:如果小于4,则从redis中取相应数量的信息来启动爬虫,如果大于等于4,则继续等待;
4、如果在运行爬虫的数量有所减少,则及时从reids中取相应数量的信息来启动爬虫。
代码实现
业务代码有点复杂和啰嗦,此处使用伪代码来演示
import redis # 实例化一个redis连接池 pool = redis.ConnectionPool(host='127.0.0.1', port=6379, decode_responses=True, db=4, password='') r = redis.Redis(connection_pool=pool) # 爬虫实例限制为4 即只允许4个scrapy实例在运行 limited = 4 # 声明redis的乐观锁 lock = r.Lock() # lock.acquire中有while循环,即它会线程阻塞,直到当前线程获得redis的lock,才会继续往下执行代码 if lock.acquire(): # 1、从reids中取一条爬虫信息 info = redis.get() # 2、while循环监听爬虫运行的数量 while True: req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json() # 统计当前有多少个爬虫在运行 running = req.get('running') + req.get('pending') # 3、判断是否等待还是要增加爬虫数量 # 3.1 如果在运行的数量大于等于设置到量 则继续等待 if running >= limited: continue # 3.2 如果小于 则启动爬虫 start_scrapy(info) # 3.3 将info从redis中删除 redis.delete(info) # 3.4 释放锁 lock.release() break
当前,这只是伪代码而已,实际的业务逻辑可能是非常复杂的,如:
@shared_task def scrapy_control(key_uuid): r = redis.Redis(connection_pool=pool) db = MysqlDB() speed_limited = db.fetch_config('REPTILE_SPEED') speed_limited = int(speed_limited[0]) keywords_num = MysqlDB().fetch_config('SEARCH_RANDOM') keywords_num = int(keywords_num[0]) # while True: lock = r.lock('lock') with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 进入处理环节' + '\n') try: # acquire默认阻塞 如果获取不到锁时 会一直阻塞在这个函数的while循环中 if lock.acquire(): with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 获得锁' + '\n') # 1 从redis中获取信息 redis_obj = json.loads(r.get(key_uuid)) user_id = redis_obj.get('user_id') contents = redis_obj.get('contents') # 2 使用while循环处理核心逻辑 is_hold_print = True while True: req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json() running = req.get('running') + req.get('pending') # 3 如果仍然有足够的爬虫在运行 则hold住redis锁,等待有空余的爬虫位置让出 if running >= speed_limited: if is_hold_print: with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 爬虫在运行,线程等待中' + '\n') is_hold_print = False time.sleep(1) continue # 4 有空余的爬虫位置 则往下走 # 4.1 处理完所有的内容后 释放锁 if len(contents) == 0: r.delete(key_uuid) with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 任务已完成,从redis中删除' + '\n') lock.release() with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 释放锁' + '\n') break # 4.2 创建task任务 task_uuid = str(uuid.uuid4()) article_obj = contents.pop() article_id = article_obj.get('article_id') article = article_obj.get('content') try: Task.objects.create( task_uuid = task_uuid, user_id = user_id, article_id = article_id, content = article ) except Exception as e: with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + '->' + str(task_uuid) + ' 创建Task出错: ' + str(e) + '\n') # finally: # 4.3 启动爬虫任务 即便创建task失败也会启动 try: task_chain(user_id, article, task_uuid, keywords_num) except Exception as e: with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 启动任务链失败: ' + str(e) + '\n') # 加入sleep 防止代码执行速度快于爬虫启动速度而导致当前线程启动额外的爬虫 time.sleep(5) except Exception as e: with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 获得锁之后的操作出错: ' + str(e) + '\n') lock.release()
小坑
scrapy启动速度相对较慢,所以while循环中,代码中执行到了爬虫的启动,需要sleep一下再去通过scrapyd接口获取爬虫运行的数量,如果立刻读取,可能会造成误判。