使用Python爬取公号文章(上)
第一时间获取 Python 技术干货!
阅读文本大概需要 10 分钟。
场景:有时候我们想爬取某个大 V 的发布的全部的文章进行学习或者分析。
这个爬虫任务我们需要借助「 Charles 」这个抓包工具,设置好手机代理 IP 去请求某个页面,通过分析,模拟请求,获取到实际的数据。
我们要爬取文章的作者、文章标题、封面图、推送时间、文件内容、阅读量、点赞数、评论数、文章实际链接等数据,最后要把数据存储到「 MongoDB 」数据库中。
首先,在 PC 上下载 Charles,并获取本地的 IP 地址。
然后,手机连上同一个网段,并手动设置代理 IP,端口号默认填 8888 。最后配置 PC 和手机上的证书及 SSL Proxying,保证能顺利地抓到 HTTPS 的请求。具体的方法可以参考下面的文章。
「https://www.jianshu.com/p/595e8b556a60?from=timeline&isappinstalled=0 」
首先我们选中一个微信公众号,依次点击右上角的头像、历史消息,就可以进入到全部消息的主界面。默认展示的是前 10 天历史消息。
然后可以查看 Charles 抓取的请求数据,可以通过「 mp.weixin.qq.com 」去过滤请求,获取到消息首页发送的请求及请求方式及响应内容。
继续往下滚动页面,可以加载到下一页的数据,同样可以获取到请求和响应的数据。
爬取的数据最后要保存在 MongoDB 文档型数据库中,所以不需要建立数据模型,只需要安装软件和开启服务就可以了。MongoDB 的使用教程可以参考下面的链接:
「 https://www.jianshu.com/p/4c5deb1b7e7c 」
为了操作 MongoDB 数据库,这里使用「 MongoEngine 」这个类似于关系型数据库中的 ORM 框架来方便我们处理数据。
pip3 install mongoengine
从上面的分析中可以知道首页消息、更多页面消息的请求 URL 规律如下:
# 由于微信屏蔽的关键字, 字段 netloc + path 用 ** 代替
# 首页请求url
https://**?action=home&__biz=MzIxNzYxMTU0OQ==&scene=126&bizpsid=0&sessionid=1545633855&subscene=0&devicetype=iOS12.1.2&version=17000027&lang=zh_CN&nettype=WIFI&a8scene=0&fontScale=100&pass_ticket=U30O32QRMK6dba2iJ3ls6A3PRbrhksX%2B7D8pF3%2Bu3uXSKvSAa1hnHzfsSClawjKg&wx_header=1
# 第二页请求url
https://**?action=getmsg&__biz=MzIxNzYxMTU0OQ==&f=json&offset=10&count=10&is_ok=1&scene=126&uin=777&key=777&pass_ticket=U30O32QRMK6dba2iJ3ls6A3PRbrhksX%2B7D8pF3%2Bu3uXSKvSAa1hnHzfsSClawjKg&wxtoken=&appmsg_token=988_rETfljlGIZqE%252F6MobN1rEtqBx5Ai9wBDbbH_sw~~&x5=0&f=json
# 第三页请求url
https://**?action=getmsg&__biz=MzIxNzYxMTU0OQ==&f=json&offset=21&count=10&is_ok=1&scene=126&uin=777&key=777&pass_ticket=U30O32QRMK6dba2iJ3ls6A3PRbrhksX%2B7D8pF3%2Bu3uXSKvSAa1hnHzfsSClawjKg&wxtoken=&appmsg_token=988_rETfljlGIZqE%252F6MobN1rEtqBx5Ai9wBDbbH_sw~~&x5=0&f=json
可以通过把 offset 设置为可变数据,请求所有页面的数据 URL可以写成下面的方式:
https://**?action=getmsg&__biz=MzIxNzYxMTU0OQ==&f=json&offset={}&count=10&is_ok=1&scene=126&uin=777&key=777&pass_ticket=U30O32QRMK6dba2iJ3ls6A3PRbrhksX%2B7D8pF3%2Bu3uXSKvSAa1hnHzfsSClawjKg&wxtoken=&appmsg_token=988_rETfljlGIZqE%252F6MobN1rEtqBx5Ai9wBDbbH_sw~~&x5=0&f=json
另外,通过 Charles 获取到请求头。由于微信的反爬机制,这里的 Cookie 和 Referer 有一定的时效性,需要定时更换。
self.headers = {
'Host': 'mp.weixin.qq.com',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 12_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/16B92 MicroMessenger/6.7.4(0x1607042c) NetType/WIFI Language/zh_CN',
'Accept-Language': 'zh-cn',
'X-Requested-With': 'XMLHttpRequest',
'Cookie': 'devicetype=iOS12.1; lang=zh_CN; pass_ticket=fXbGiNdtFY050x9wsyhMnmaSyaGbSIXNzubjPBqiD+c8P/2GyKpUSimrtIKQJsQt; version=16070430; wap_sid2=CMOw8aYBElx2TWQtOGJfNkp3dmZHb3dyRnpRajZsVlVGX0pQem4ycWZSNzNFRmY3Vk9zaXZUM0Y5b0ZpbThVeWgzWER6Z0RBbmxqVGFiQ01ndFJyN01LNU9PREs3OXNEQUFBfjC409ngBTgNQJVO; wxuin=349984835; wxtokenkey=777; rewardsn=; pac_uid=0_f82bd5abff9aa; pgv_pvid=2237276040; tvfe_boss_uuid=05faefd1e90836f4',
'Accept': '*/*',
'Referer': 'https://**?action=home&__biz=MzIxNzYxMTU0OQ==&scene=126&sessionid=1544890100&subscene=0&devicetype=iOS12.1&version=16070430&lang=zh_CN&nettype=WIFI&a8scene=0&fontScale=100&pass_ticket=pg%2B0C5hdqENXGO6Fq1rED9Ypx20C2vuodaL8DCwZwVe22sv9OtWgeL5YLjUujPOR&wx_header=1'
}
最后通过 requests 去模拟发送请求。
response = requests.get(current_request_url, headers=self.headers, verify=False)
result = response.json()
通过 Charles 返回的数据格式可以得知消息列表的数据存储在 general_msg_list 这个 Key 下面。因此可以需要拿到数据后进行解析操作。
has_next_page 字段可以判断是否存在下一页的数据;如果有下一页的数据,可以继续爬取,否则终止爬虫程序。
ps:由于 Wx 反爬做的很完善,所以尽量降低爬取的速度。
response = requests.get(current_request_url, headers=self.headers, verify=False)
result = response.json()
if result.get("ret") == 0:
msg_list = result.get('general_msg_list')
# 保存数据
self._save(msg_list)
self.logger.info("获取到一页数据成功, data=%s" % (msg_list))
# 获取下一页数据
has_next_page = result.get('can_msg_continue')
if has_next_page == 1:
# 继续爬取写一页的数据【通过next_offset】
next_offset = result.get('next_offset')
# 休眠2秒,继续爬下一页
time.sleep(2)
self.spider_more(next_offset)
else: # 当 has_next 为 0 时,说明已经到了最后一页,这时才算爬完了一个公众号的所有历史文章
print('爬取公号完成!')
else:
self.logger.info('无法获取到更多内容,请更新cookie或其他请求头信息')
def _save(self, msg_list):
"""
数据解析
:param msg_list:
:return:
"""
# 1.去掉多余的斜线,使【链接地址】可用
msg_list = msg_list.replace("\/", "/")
data = json.loads(msg_list)
# 2.获取列表数据
msg_list = data.get("list")
for msg in msg_list:
# 3.发布时间
p_date = msg.get('comm_msg_info').get('datetime')
# 注意:非图文消息没有此字段
msg_info = msg.get("app_msg_ext_info")
if msg_info: # 图文消息
# 如果是多图文推送,把第二条第三条也保存
multi_msg_info = msg_info.get("multi_app_msg_item_list")
# 如果是多图文,就从multi_msg_info中获取数据插入;反之直接从app_msg_ext_info中插入
if multi_msg_info:
for multi_msg_item in multi_msg_info:
self._insert(multi_msg_item, p_date)
else:
self._insert(msg_info, p_date)
else:
# 非图文消息
# 转换为字符串再打印出来
self.logger.warning(u"此消息不是图文推送,data=%s" % json.dumps(msg.get("comm_msg_info")))
from datetime import datetime
from mongoengine import connect
from mongoengine import DateTimeField
from mongoengine import Document
from mongoengine import IntField
from mongoengine import StringField
from mongoengine import URLField
__author__ = 'xag'
# 权限连接数据库【数据库设置了权限,这里必须指定用户名和密码】
response = connect('admin', host='localhost', port=27017,username='root', password='xag')
class Post(Document):
"""
文章【模型】
"""
title = StringField() # 标题
content_url = StringField() # 文章链接
source_url = StringField() # 原文链接
digest = StringField() # 文章摘要
cover = URLField(validation=None) # 封面图
p_date = DateTimeField() # 推送时间
author = StringField() # 作者
content = StringField() # 文章内容
read_num = IntField(default=0) # 阅读量
like_num = IntField(default=0) # 点赞数
comment_num = IntField(default=0) # 评论数
reward_num = IntField(default=0) # 点赞数
c_date = DateTimeField(default=datetime.now) # 数据生成时间
u_date = DateTimeField(default=datetime.now) # 数据最后更新时间
def _insert(self, msg_info, p_date):
"""
数据插入到 MongoDB 数据库中
:param msg_info:
:param p_date:
:return:
"""
keys = ['title', 'author', 'content_url', 'digest', 'cover', 'source_url']
# 获取有用的数据,构建数据模型
data = sub_dict(msg_info, keys)
post = Post(**data)
# 时间格式化
date_pretty = datetime.fromtimestamp(p_date)
post["p_date"] = date_pretty
self.logger.info('save data %s ' % post.title)
# 保存数据
try:
post.save()
except Exception as e:
self.logger.error("保存失败 data=%s" % post.to_json(), exc_info=True)
推荐使用工具 Robo3T 连接 MongoDB 数据库,可以查看到公号文章数据已经全部保存到数据库中。