基于asyncio 异步协程框架实现收集B站直播弹幕
作者:liyiyang 发布时间:2021-07-14 19:34:40
前言
虽然标题是全站,但目前只做了等级 top 100 直播间的全天弹幕收集。
弹幕收集系统基于之前的B 站直播弹幕姬 Python 版修改而来。具体协议分析可以看上一篇文章。
直播弹幕协议是直接基于 TCP 协议,所以如果 B 站对类似我这种行为做反制措施,比较困难。应该有我不知道的技术手段来检测类似我这种恶意行为。
我试过同时连接 100 个房间,和连接单个房间 100 次的实验,都没有问题。>150 会被关闭链接。
直播间的选取
现在弹幕收集系统在选取直播间上比较简单,直接选取了等级 top100。
以后会修改这部分,改成定时去 http://live.bilibili.com/all 查看新开播的直播间,并动态添加任务。
异步任务和弹幕存储
收集系统仍旧使用了 asyncio 异步协程框架,对于每一个直播间都使用如下方法来加进 loop 中。
danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
task1 = asyncio.ensure_future(danmuji.connectServer())
task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())
其实若将心跳任务 HeartbeatLoop 放入 connectorServer 中去启动,代码看起来更优雅一些。但这么做是因为我需要维护一个任务列表,后面会有描述。
在弹幕存储上我花了些时间选择。
数据库存储是一个同步 IO 的过程,Insert 的时候会阻塞弹幕收集的任务。虽然有 aiomysql 这种异步接口,但配置数据库太麻烦,我的设想是这个小系统能够方便地部署。
最终我选择使用自带的 sqlite3。但 sqlite3 无法做并行操作,故开了一个线程单独进行数据库存储。在另一个线程中,100 * 2 个任务搜集所有的弹幕、人数信息,并塞进队列 commentq, numq 中。存储线程每隔 10s 唤醒一次,将队列中的数据写进 sqlite3 中,并清空队列。
在多线程和异步的配合下,网络流量没有被阻塞。
可能的连接失败场景处理
弹幕协议是直接基于 TCP,位与位直接关联性较强,一旦解析错误,很容易就抛 Exception(个人感觉,虽然 TCP 是可靠传输,但B站服务器自身发生错误也是有可能的)。所以有必要设计一个自动重连机制。
在 asyncio 文档中提到,
Done means either that a result / exception are available, or that the future was cancelled.
函数正常返回、抛出异常或者是被 cancel,都会退出当前任务。可以使用 done() 来判断。
每一个直播间对应两个任务,解析任务是最容易挂的,但并不会影响心跳任务,所以必须找出并将对应心跳任务结束。
在创建任务的时候使用字典记录每个房间的两个任务,
self.tasks[url] = [task1, task2]
在运行过程中,每隔 10s 做一次检查,
for url in self.tasks:
item = self.tasks[url]
task1 = item[0]
task2 = item[1]
if task1.done() == True or task2.done() == True:
if task1.done() == False:
task1.cancel()
if task2.done() == False:
task2.cancel()
danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
task11 = asyncio.ensure_future(danmuji.connectServer())
task22 = asyncio.ensure_future(danmuji.HeartbeatLoop())
self.tasks[url] = [task11, task22]
实际我只见过一次任务失败的场景,是因为主播房间被封了,导致无法进入直播间。
结论
B站人数是按照连接弹幕服务器的链接数量统计的。通过操纵链接量,可以瞬间增加任意人数观看,有商机?
运行的这几天中,发现即使大部分房间不在直播,也能有 >5 的人数,包括凌晨。我只能猜测也有和我一样的人在 24h 收集弹幕。
top100 平均一天 40M 弹幕数据。
收集的弹幕能做什么?还没想好,可能可以拿来做用户行为分析 -_^
最后附上本源码的GITHUB地址 https://github.com/lyyyuna/bilibili_danmu_colloector


猜你喜欢
- 本文实例为大家分享了pygame实现贪吃蛇游戏的具体代码,供大家参考,具体内容如下为了简化起见,游戏素材暂定为两张简单的图片(文中用的是30
- 一丶什么是索引索引是存储引擎快速找到记录的一种数据结构。数据库中的数据可以理解成字典中的单词,而索引就是目录,显而易见这是一种空间换时间的做
- python装饰器就是用于扩展原函数功能的一种函数,这个函数特殊的地方就是它的返回值也是一个函数,使用Python装饰器的一个好处就是:在不
- 如何在一个广告旗帜里轮番显示时间长度不一的不同广告?好了,下面就是Ad Rotator组件完整的应用例子:adrot.asp<html
- ucky-canvas 介绍一个基于 Js + Canvas 的【大转盘 & 九宫格 & * 】抽奖, 致力于为 web
- 随着计算机硬件的不断发展,多核CPU已经成为普及的硬件设备,利用多核CPU的优势可以有效的提高程序的执行效率。而多进程模式可以实现多核CPU
- 1. 安装yaml库想要使用python实现yaml与json格式互相转换,需要先下载pip,再通过pip安装yaml库。如何下载以及使用p
- strip_tags 去掉 HTML 及 PHP 的标记。 语法: string stri
- APScheduler是一个Python定时任务框架,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持
- 最近分别用vue和Android实现了一个券码复制功能,长按券码会在上方弹出一个拷贝的icon提示,点击icon将券码内容复制到剪贴板。现将
- 用vue写业务代码时候,后端大神丢给我一堆数据,要求是做全选,反选功能,然后把用户更改的数据全部返回给他基本思路如果父级选中了,那么父级下面
- 本文介绍了asp中 adpbe.stream 的语法,各种参数使用说明,方便大家查阅。更多请看:VBScript 速查手册(语言参考) ch
- 下载资源hadoop3.0.0spark-2.4.4-bin-without-hadoopwinutils下载(对应hadoop3.0.1的
- 1.下载mysql的repo源$ wget http://repo.mysql.com/mysql-community-release-el
- 开发web应用程序是一件非常辛苦的事情,你需要花大把大把的时间来做无数的事情。假如你不运用有条理的方法,尤其是在复杂的项目中,你会承受忽视项
- 让ASP搭配MYSQL所需要工具mysql-4.1.11-win32 myodbc-3.51.11-1-dll myodbc-3.51.11
- 引言欢迎来到我们的系列博客《Python全景系列》!在这个系列中,我们将带领你从Python的基础知识开始,一步步深入到高级话题,帮助你掌握
- 0 环境Python版本:3.6.8系统版本:macOS MojavePython Jupyter Notebook1 引言七月了,大家最近
- 可用下面提供的强制登录的程序,它也可以用来做会员注册的。security.asp<%bLoggedIn =&nb
- git和svn有什么区别呢?git采用分布式版本库管理,而svn采用集中式版本库管理。集中式版本库管理需要有一台存放版本库的服务器,开发人员