Python教你从0搭建微信推送斗鱼直播提醒(单房间简化版)

重新推出重制版,单房间推送提醒版本(后文有扩展多房间思路),适合没有服务器的小伙伴学习使用。

Gitee文档同步更新:https://gitee.com/LGW_space/dy-live-lintener
语雀文档更为详细(包含成品案例):https://www.yuque.com/books/share/fdc7c120-e4eb-47d5-93de-dcf5d347e5a8?# 《斗鱼直播提醒服务》

www.zuowenge.cn

斗鱼直播提醒 ——简化版

功能介绍:

当然在学习Python的道路上肯定会困难,没有好的学习资料,怎么去学习呢?
学习Python中有不明白推荐加入交流Q群号:928946953 群里有志同道合的小伙伴,互帮互助, 群里有不错的视频学习教程和PDF!
还有大牛解答!
  • 主播开播后或更换房间标题后推送提醒到微信(单房间)

    > 理论上可以配置多个房间,可以达到和斗鱼全房间推送的效果一样。

项目截图:

[TOC]

一、项目准备

1. 用到的技术内容

  • 云函数

  • 对象存储(为什么使用对象存储,下文有介绍)

  • python(3.6/3.7)

  • WxPusher

2. 需要的工具

  • 腾讯云(云函数和云对象存储)

二、开通对象存储

这里我们使用对象存储相当于数据库的一个作用,为什么使用对象存储呢?
> 对象存储采用扁平的文件组织方式,所以在文件量上升至千万、亿级别,容量在PB级别的时候,这种文件组织方式下的性能优势就显现出来了,文件不在有目录树深度的问题,历史和近线数据有同样的访问效率。另外,对象存储多采用分布式架构,

简而言之就是便宜效率高

官方入门步骤

1. 注册腾讯云

官方的注册方法

2. 开通 COS 服务

在 腾讯云控制台 中,选择【云产品】>【对象存储】,进入 COS 控制台,按照界面提示开通 COS 服务。(如果您已开通,请跳过该步骤。)

3. 创建存储桶

我们需要创建一个用于存放对象的存储桶:

  1. 在 对象存储控制台 左侧导航栏中单击【存储桶列表】,进入存储桶管理页。

  2. 单击【创建存储桶】,输入以下配置信息,其他配置保持默认即可。

    • 名称:输入存储桶名称。名称设置后不可修改。此处举例输入 examplebucket。

    • 所属地域:存储桶所属地域,选择与您业务最近的一个地区,例如广州地域。

    • 访问权限:存储桶访问权限,此处我们保持默认为“私有读写”。

  3. 单击【确定】,即可创建完成。

三、云函数的开发

云函数我们使用腾讯云函数作为示例,阿里云也同理。(阿里云似乎是不允许触发器的触发周期低于1分钟)

1. 注册腾讯云

官方的注册方法

2. 创建云函数

  • 登录腾讯云,在云产品里找到云函数,在【函数服务】里新建云函数

  • 选择【自定义创建】

  • 修改【函数名称】、【地域】、【运行环境(Python3.6)】、【提交方法(在线编辑)】、【执行方法(index.main_handler)】暂时不用修改默认代码。

  • 点击完成

3. 完善功能

在写代码之前,我们需要用到这几个关键信息:

  • appid :你的APPID(账号信息里有)

  • secret_id : 你的 SecretId (API密钥管理里获取)

  • secret_key :你的 SecretKey(API密钥管理里获取)

  • region :存储桶所在的地域

  • BUCKET:存储桶名称

  • token :没有可为空

  • FILE_NAME :你要上传的文件名 如room_info

  • WxPusherToken :WxPusher的用户appToken

3.1 引入所需包,完成基本配置

复制代码 隐藏代码# -*- coding: utf8 -*-from qcloud_cos_v5 import CosConfigfrom qcloud_cos_v5 import CosS3Clientfrom qcloud_cos_v5 import CosServiceErrorfrom qcloud_cos_v5 import CosClientErrorimport datetimeimport jsonimport loggingimport requestsfrom urllib.parse import parse_qs

logger = logging.getLogger()
logger.setLevel(logging.ERROR) # 默认打印 INFO 级别日志,可根据需要调整为 DEBUG、WARNING、ERROR、CRITICAL 级日志appid = 123123  # Please replace with your APPID. 请替换为您的 APPIDsecret_id = u'xxx'  # Please replace with your SecretId. 请替换为您的 SecretIdsecret_key = u'xxx'  # Please replace with your SecretKey. 请替换为您的 SecretKeyregion = u'ap-shanghai'  # Please replace with the region where COS bucket located. 请替换为您bucket 所在的地域BUCKET = 'xxx'FILE_NAME = 'xxx'token = ''WxPusherToken = 'xxxxxx'# 配置存储桶config = CosConfig(Secret_id=secret_id, Secret_key=secret_key, Region=region, Token=token)
client = CosS3Client(config)

注:qcloud_cos_v5为 COS的SDK

3.2 创建方法 根据房间号获取直播状态

斗鱼第三方API,通过房间号获取房间信息,自行百度获取。
最后返回 房间信息

复制代码 隐藏代码def getRoomInfo():
url = '斗鱼官方API/房间号'try:
r = requests.get(url, timeout=5)except requests.exceptions.RequestException as e:print(str(e))return 'timeout'else:if r.status_code == 404:return '404'json_str = json.loads(r.text)if isinstance(json_str, dict):if 'error' in json_str.keys() and json_str['error'] == 0:
room_info = json_str['data']return room_info

3.3 创建方法 发送WxPusher消息

复制代码 隐藏代码def send_WxPusher_msg(c):
    headers = {        'Content-Type': 'application/json'
    }
    url = 'http://wxpusher.zjiecode.com/api/send/message'
    parameter = {        "appToken": WxPusherToken,        "content": c,        "contentType": 1,    # 内容类型 1表示文字  2表示html(只发送body标签内部的数据即可,不包括body标签) 3表示markdown
        "topicIds":[
            你的主题号
        ]
    }
    r = requests.post(url=url, headers=headers,
                      data=json.dumps(parameter, ensure_ascii=False).encode('utf-8'))
    json_str = json.loads(r.text)
    datas = json_str['data']    for data in datas:        if data['code'] == 1001:
            logger.error(data['status'])

3.4 创建方法 更新(上传)文件

设置更新时间字段,用于锁定更新,在指定时间内不允许更新/重复推送(我猜是腾讯云多线程的问题,会导致重复更新、重复推送)

复制代码 隐藏代码def updateLocalInfo(roominfo):
DIC = {"room_id": roominfo["room_id"],"room_status": roominfo["room_status"],"room_name": roominfo["room_name"],"update_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
response = client.put_object(
Bucket=BUCKET,
Body=DIC,
Key=FILE_NAME
)print("更新成功:",response['ETag'])

3.5 创建方法 获取文件

url获取:上传文件后,在存储桶内点击文件的详情基本信息对象地址信息
https://存储桶名.cos.地域.myqcloud.com/文件名

复制代码 隐藏代码def getLocalInfo():
url = '文件的对象地址'r = requests.get(url)
info_str = r.text
params = parse_qs(info_str)
result = {key: params[key][0] for k
ey in params}return result

3.6 创建方法 信息整合

复制代码 隐藏代码def time_last(time_str): # 计算时间差
    t_r = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
    now_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    t_n = datetime.datetime.strptime(now_time_str, "%Y-%m-%d %H:%M:%S")    return (t_n - t_r).secondsdef checkRoomInfo():
    room = getLocalInfo() # 获取历史房间信息
    curr_info = getRoomInfo() # 获取当前真实房间信息
    time_l = time_last(room['update_time']) # 计算当前时间和上次更新时间的时间差
    # 直播开播提醒
    if curr_info['room_status'] == "1": # 正在直播
        if room['room_status'] == "2":  # 之前状态为 未开播
            if time_l > 300:  # 300秒内无法重复推送
                content = '【开播】您关注的主播:' + curr_info['owner_name'] + '开始直播啦' + \                            '\n房间标题:' + curr_info['room_name'] + \                            '\n正在直播:' + curr_info['cate_name']
                send_WxPusher_msg(content)
                updateLocalInfo(roominfo=curr_info)            else:
                logger.error(                        '【开播】房间号:' + room['room_id'] + '\n上次更新:' + room['update_time'] + '\n时间差:' + str(
                            time_l))    else:        if room['room_status'] == "1":  # 关闭了直播
            updateLocalInfo(roominfo=curr_info)    # 更换房间标题提醒
    if curr_info['room_name'] != room['room_name']:        if time_l > 300:
            content1 = '【更换标题】您关注的主播:' + curr_info['owner_name'] + '改变了房间标题' + \                        '\n当前房间标题:' + curr_info['room_name']
            send_WxPusher_msg(content1)
            updateLocalInfo(roominfo=curr_info)        else:
                logger.error(                        '【更换标题】房间号:' + room['room_id'] + '\n上次更新:' + room['update_time'] + '\n时间差:' + str(
                            time_l))

到这所有工能就基本全部完成啦~

(0)

相关推荐