7.8. 实例:定时发送Email¶
我们不准备完全依靠标准库来实现,而准备选取第三方库——schedule。
原因在于:标准库一般意味着最原始最基础的功能,第三方库很多是去调用标准库中封装好了的操作函数。比如schedule,就是用time和datetime来实现的。
而对于我们需要的定时功能,time和datetime当然能实现,但操作逻辑会相对复杂;而schedule就是可以直接解决定时功能,代码比较简单,这是我们选择schedule的原因。
schedule用于定期作业的进程内调度程序,使用生成器模式进行配置。 通过Schedule,您可以使用简单,人性化的语法按预定的时间间隔定期运行Python函数
特点
一种易于使用的API,用于调度作业。
非常轻巧,没有外部依赖性。
出色的测试覆盖率。
在Python 2.7、3.5和3.6上测试过。
7.8.1. 实例¶
每天上午定时启动下面任务, 读取一个csv文档,随机读取一行内容 将内容发邮件到自己的邮箱。
首先涉及到定时任务,指定时间启动一个任务。 我们可以把这个任务定义为自定义函数,到时间调用这个函数即可,涉及到自定义函数。 其次是读取csv文档,csv其实相当于文本文档,然后读取这个文档有多少行,涉及到 Python 对文档的读写的知识。 然后生成一个行数以内的随机数,读取这个随机数所在行的内容保存到变量,涉及到 Python 列表的知识。 把这个变量的内容发送到自己的邮箱,这个就是Python 使用 yagmail 模块发送邮件的内容了。
>>> import schedule,time,random,yagmail
>>> from cfg import email_cfg
>>> def job():
>>> with open('demo.csv', 'r') as f:
>>> text = f.readlines()
>>> count = len(text)
>>> news_line = random.randint(1,count-1)
>>> news = text[news_line]
>>> yag = yagmail.SMTP(user=email_cfg['user'],password=email_cfg['passwd'] ,host='smtp.163.com')
>>> contents = [news]
>>> yag.send('alanyers@163.com','这是一条新闻',contents)
定义为每天运行已定义好的任务。并设置一个循环每分钟检查运行的状态。
>>> schedule.every().day.at('10:05').do(job)
>>> while True:
>>> schedule.run_pending()
>>> time.sleep(1)
schedule 执行定时任务的格式:
首先我们要定义一个函数,把想要执行的任务放进去
>>> def job():
>>> print('时间到了')
每10分钟执行一次任务
>>> import schedule
>>> schedule.every(10).minutes.do(job)
Every 10 minutes do job() (last run: [never], next run: 2020-08-19 10:39:56)
每小时执行一次任务
>>> schedule.every().hour.do(job)
Every 1 hour do job() (last run: [never], next run: 2020-08-19 11:07:12)
每天在什么时间点执行一次任务
>>> schedule.every().day.at('10:30').do(job)
Every 1 day at 10:30:00 do job() (last run: [never], next run: 2020-08-19 10:30:00)
每5-10分钟(随机)执行一次任务
>>> schedule.every(5).to(10).minutes.do(job)
Every 5 to 10 minutes do job() (last run: [never], next run: 2020-08-19 10:17:13)
每周一执行一次任务
>>> schedule.every().monday.do(job)
Every 1 week do job() (last run: [never], next run: 2020-08-24 10:07:14)
每周一什么时间点执行一次任务
>>> schedule.every().minute.at(':17').do(job)
Every 1 minute at 00:00:17 do job() (last run: [never], next run: 2020-08-19 10:07:17)
在程序最后,设置一个循环,每分钟检查一下任务执行状态。
>>> while True:
>>> schedule.run_pending()
>>> time.sleep(1)