APScheduler(Python化的Cron)使用总结 定时任务

APScheduler(Python化的Cron)使用总结

简介

APScheduler全程为Advanced Python Scheduler,是一款轻量级的Python任务调度框架。它允许你像Cron那样安排定期执行的任务,并且支持Python函数或任意可调用的对象。官方文档:https://apscheduler.readthedocs.io/en/latest/userguide.html#basic-concepts

APScheduler安装

方法一:使用pip安装

$ pip install apscheduler 
方法二:如果pip不起作用,可以从pypi上下载最新的源码包(https://pypi.python.org/pypi/APScheduler/)进行安装:

$ python setup.py install 
APScheduler组件

triggers(触发器): 触发器中包含调度逻辑,每个作业都由自己的触发器来决定下次运行时间。除了他们自己初始配置意外,触发器完全是无状态的。

job stores(作业存储器):存储被调度的作业,默认的作业存储器只是简单地把作业保存在内存中,其他的作业存储器则是将作业保存在数据库中。当作业被保存到一个持久化的作业存储器中的时候,该作业的数据会被序列化,并在加载时被反序列化。作业存储器不能共享调度器。

executors(执行器):处理作业的运行,他们通常通过在作业中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。

schedulers(调度器):配置作业存储器和执行器可以在调度器中完成,例如添加、修改和移除作业。根据不同的应用场景可以选用不同的调度器,可选的有BlockingScheduler,BackgroundScheduler,AsyncIOScheduler,GeventScheduler,TornadoScheduler,TwistedScheduler,QtScheduler 7种。

调度器

BlockingScheduler : 当调度器是你应用中唯一要运行的东西时。 
BackgroundScheduler : 当你没有运行任何其他框架并希望调度器在你应用的后台执行时使用(充电桩即使用此种方式)。 
AsyncIOScheduler : 当你的程序使用了asyncio(一个异步框架)的时候使用。 
GeventScheduler : 当你的程序使用了gevent(高性能的Python并发框架)的时候使用。 
TornadoScheduler : 当你的程序基于Tornado(一个web框架)的时候使用。 
TwistedScheduler : 当你的程序使用了Twisted(一个异步框架)的时候使用 
QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。 
作业存储器

如果你的应用在每次启动的时候都会重新创建作业,那么使用默认的作业存储器(MemoryJobStore)即可,但是如果你需要在调度器重启或者应用程序奔溃的情况下任然保留作业,你应该根据你的应用环境来选择具体的作业存储器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多数RDBMS)

执行器

对执行器的选择取决于你使用上面哪些框架,大多数情况下,使用默认的ThreadPoolExecutor已经能够满足需求。如果你的应用涉及到CPU密集型操作,你可以考虑使用ProcessPoolExecutor来使用更多的CPU核心。你也可以同时使用两者,将ProcessPoolExecutor作为第二执行器。

触发器

当你调度作业的时候,你需要为这个作业选择一个触发器,用来描述这个作业何时被触发,APScheduler有三种内置的触发器类型:

date 一次性指定日期 
interval 在某个时间范围内间隔多长时间执行一次 
cron 和Linux crontab格式兼容,最为强大 
date

最基本的一种调度,作业只会执行一次。它的参数如下:

run_date (datetime|str) – 作业的运行日期或时间 
timezone (datetime.tzinfo|str) – 指定时区 
举个栗子:

# 2016-12-12运行一次job_function
sched.add_job(job_function, 'date', run_date=date(2016, 12, 12), args=['text'])
# 2016-12-12 12:00:00运行一次job_function
sched.add_job(job_function, 'date', run_date=datetime(2016, 12, 12, 12, 0, 0), args=['text'])
interval

间隔调度,参数如下:

weeks (int) – 间隔几周 
days (int) – 间隔几天 
hours (int) – 间隔几小时 
minutes (int) – 间隔几分钟 
seconds (int) – 间隔多少秒 
start_date (datetime|str) – 开始日期 
end_date (datetime|str) – 结束日期 
timezone (datetime.tzinfo|str) – 时区 
举个栗子:

# 每两个小时调一下job_function
sched.add_job(job_function, 'interval', hours=2)
cron

参数如下:

year (int|str) – 年,4位数字 
month (int|str) – 月 (范围1-12) 
day (int|str) – 日 (范围1-31) 
week (int|str) – 周 (范围1-53) 
day_of_week (int|str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun) 
hour (int|str) – 时 (范围0-23) 
minute (int|str) – 分 (范围0-59) 
second (int|str) – 秒 (范围0-59) 
start_date (datetime|str) – 最早开始日期(包含) 
end_date (datetime|str) – 最晚结束时间(包含) 
timezone (datetime.tzinfo|str) – 指定时区 
举个栗子:

# job_function将会在6,7,8,11,12月的第3个周五的1,2,3点运行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 截止到2016-12-30 00:00:00,每周一到周五早上五点半运行job_function
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2016-12-31')

配置调度程序

在应用程序中使用默认作业存储和默认执行程序运行BackgroundScheduler的例子:

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
    # Initialize the rest of the application here, or before the scheduler initialization

这将生成一个名为“default”的MemoryJobStore和名为“default”的ThreadPoolExecutor的BackgroundScheduler,默认最大线程数为10。

如果不满足于当前配置,如希望使用两个执行器有两个作业存储器,并且还想要调整新作业的默认值并设置不同的时区,可按如下配置:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

 # 配置作业存储器
jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 配置执行器,并设置线程数
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,     # 默认情况下关闭新的作业
    'max_instances': 3     # 设置调度程序将同时运行的特定作业的最大实例数3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

启动调度器

启动调度器只需要调用start()方法,对于除BlockingScheduler以外的调度程序,此调用将立即返回,您可以继续应用程序的初始化过程,可能会将作业添加到调度程序。

对于BlockingScheduler,只需在完成初始化步骤后调用start()

scheduler.start()

添加作业

方法一:调用add_job()方法

最常见的方法,add_job()方法返回一个apscheduler.job.Job实例,您可以稍后使用它来修改或删除该作业。

方法二:使用装饰器scheduled_job()

此方法主要是方便的声明在应用程序运行时不会改变的作业

删除作业

方法一:通过作业ID或别名调用remove_job()删除作业

方法二:通过add_job()返回的job实例调用remove()方法删除作业

举个栗子:

# 实例删除
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
 # id删除
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

暂停和恢复作业

可以通过Job实例或调度程序本身轻松暂停和恢复作业。 当作业暂停时,下一个运行时间将被清除,直到作业恢复,不会再计算运行时间。 要暂停作业,请使用以下任一方法:

apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()

恢复作业:

apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()

获取作业列表

要获得计划作业的机器可处理列表,可以使用get_jobs()方法。 它将返回一个Job实例列表。 如果您只对特定作业存储中包含的作业感兴趣,则将作业存储别名作为第二个参数。

为了方便起见,可以使用print_jobs()方法,它将打印格式化的作业列表,触发器和下次运行时间。

修改作业属性

您可以通过调用apscheduler.job.Job.modify()或modify_job()来修改除id以外的任何作业属性。

job.modify(max_instances=6, name='Alternate name')

关闭调度程序

默认情况下,调度程序关闭其作业存储和执行程序,并等待所有当前正在执行的作业完成,wait=False参数可选,代表立即停止,不用等待。

scheduler.shutdown(wait=False) 
附:1、定时任务运行脚本小例子:

import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from app.untils.log_builder import sys_logging

scheduler = BlockingScheduler()   # 后台运行

 # 设置为每日凌晨00:30:30时执行一次调度程序
@scheduler.scheduled_job("cron", day_of_week='*', hour='1', minute='30', second='30')
def rebate():
        print "schedule execute"
        sys_logging.debug("statistic scheduler execute success" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

if __name__ == '__main__':
    try:
        scheduler.start()
        sys_logging.debug("statistic scheduler start success")
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
        sys_logging.debug("statistic scheduler start-up fail")

2、第一次使用此定时器时总会执行两次,一直不知道为什么,后来发现,python 的flask框架在debug模式下会多开一个线程监测项目变化,所以每次会跑两遍,可以将debug选项改为False

(0)

相关推荐

  • APScheduler中两种调度器的区别及使用过程中要注意的问题

    摘要 本文介绍APScheduler最基本的用法"定时几秒后启动job",解释其中两种调度器BackgroundScheduler和BlockingScheduler的区别,说明了 ...

  • Spring Boot 配置 Quartz 定时任务

    Quartz有四个核心概念: Job:是一个接口,只定义一个方法 execute(JobExecutionContext context),在实现接口的 execute 方法中编写所需要定时执行的 J ...

  • 定时执行执行任务-APScheduler的全称是Advanced Python Scheduler。

    from datetime import datetime from datetime import date import time from apscheduler.schedulers.back ...

  • Python APScheduler 定时任务

    下载 pip install apscheduler Hello World #!/usr/bin/env python from apscheduler.schedulers.blocking im ...

  • Python使用APScheduler实现定时任务

    APScheduler是基于Quartz的一个Python定时任务框架.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以持久化任务.在线文档:https://apscheduler. ...

  • python接口自动化8-参数化

    前言 前面一篇实现了参数的关联,那种只是记流水账的完成功能,不便于维护,也没什么可读性,接下来这篇可以把每一个动作写成一个函数,这样更方便了. 参数化的思维只需记住一点:不要写死! 一.登录函数 1. ...

  • Python|加权平均法读取灰度化图像介

    问题描述灰度化的原理时假定每个像素点的三通道值相同,并用统一的灰度值待代替.加权平均法读取灰度化图像时,是将三个通道的通道值进行加权,然后用来代替灰度.实际中加权平均法RGB灰度化的公式为: 式中表示 ...

  • 火爆国外的Python教程,终于迎来了汉化版!

    有一本Python教程在国外爆火,被各大高校及机构争相采纳: 此书已被 NASA 采用,在喷气推进实验室及深空网络计划中使用: 阿姆斯特丹自由大学的 Principles of Programming ...

  • 年关降至,学会用Python定制化群发邮件

    Python网络爬虫与文本数据分析(视频课) 年关降至,肯定有群发短信发感谢信的需求,今天继续接着昨天的自动化办公教程来一个自动群发邮件.仍然假设是给员工核算工资,发送工资条详情顺带着表达一下公司的感 ...

  • 用 Python 开发 DeFi 去中心化应用(下)

    在本教程中,我们将介绍如何使用 Python 开发 DeFi 项目.去中心化金融 (DeFi) 是区块链和智能合约世界最重要的进步之一,通常被称为"新金融科技".在阅读文本前,请先 ...

  • Python|MUI-优化标题栏

    前言三步搭建MUI页面主框架法包括新建含mui的HTML文件.输入mheader(标题栏).输入mbody(主体).一个特色鲜明MUI界面无疑是能够吸引用户的关键之一,这利用css和JavaScrip ...