【超实用】用Python语言实现定时任务的八个方法,建议收藏!
在日常工作中,我们常常会用到需要周期性执行的任务,一种方式是采用 Linux 系统自带的 crond 结合命令行实现。另外一种方式是直接使用Python。接下来整理的是常见的Python定时任务的八种实现方式。
利用while True: + sleep()实现定时任务
位于 time 模块中的 sleep(secs) 函数,可以实现令当前执行的线程暂停 secs 秒后再继续执行。所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态,等待 CPU 调度。
基于这样的特性我们可以通过while死循环+sleep()的方式实现简单的定时任务。
代码示例:
import datetime
import time
def time_printer():
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func time :', ts)
def loop_monitor():
while True:
time_printer()
time.sleep(5) # 暂停5秒
if __name__ == "__main__":
loop_monitor()
主要缺点:
-
只能设定间隔,不能指定具体的时间,比如每天早上8:00
-
sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作。
使用Timeloop库运行定时任务
Timeloop是一个库,可用于运行多周期任务。这是一个简单的库,它使用decorator模式在线程中运行标记函数。
示例代码:
import time
from timeloop import Timeloop
from datetime import timedelta
tl = Timeloop()
@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():
print "2s job current time : {}".format(time.ctime())
@tl.job(interval=timedelta(seconds=5))
def sample_job_every_5s():
print "5s job current time : {}".format(time.ctime())
@tl.job(interval=timedelta(seconds=10))
def sample_job_every_10s():
print "10s job current time : {}".format(time.ctime())
利用threading.Timer实现定时任务
threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点,timer最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。
Timer(interval, function, args=[ ], kwargs={ })
-
interval: 指定的时间
-
function: 要执行的方法
-
args/kwargs: 方法的参数
备注:Timer只能执行一次,这里需要循环调用,否则只能执行一次
案例:Python使用threading.Timer实现执行可循环的定时任务_python timer 循环-CSDN博客
利用内置模块sched实现定时任务
sched模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。
class sched.scheduler(timefunc, delayfunc)这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc是一个没有参数的返回时间类型数字的函数(常用使用的如time模块里面的time),delayfunc应该是一个需要一个参数来调用、与timefunc的输出兼容、并且作用为延迟多个时间单位的函数(常用的如time模块的sleep)。
代码示例:
import datetime
import time
import sched
def time_printer():
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func time :', ts)
loop_monitor()
def loop_monitor():
s = sched.scheduler(time.time, time.sleep) # 生成调度器
s.enter(5, 1, time_printer, ())
s.run()
if __name__ == "__main__":
loop_monitor()
scheduler对象主要方法:
-
enter(delay, priority, action, argument),安排一个事件来延迟delay个时间单位。
-
cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个ValueError。
-
run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的delayfunc()函数),然后执行事件,直到不再有预定的事件。
个人点评:比threading.Timer更好,不需要循环调用。
利用调度模块schedule实现定时任务
schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。schedule允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。
先来看代码,是不是不看文档就能明白什么意思?
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
装饰器:通过 @repeat() 装饰静态方法
import time
from schedule import every, repeat, run_pending
@repeat(every().second)
def job():
print('working...')
while True:
run_pending()
time.sleep(1)
传递参数:
import schedule
def greet(name):
print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
while True:
schedule.run_pending()
装饰器同样能传递参数:
from schedule import every, repeat, run_pending
@repeat(every().second, 'World')
@rep
原文地址:https://blog.csdn.net/weixin_41861301/article/details/135713503
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!