7.8. 实例:定时发送Email

我们不准备完全依靠标准库来实现,而准备选取第三方库——schedule。

原因在于:标准库一般意味着最原始最基础的功能,第三方库很多是去调用标准库中封装好了的操作函数。比如schedule,就是用time和datetime来实现的。

而对于我们需要的定时功能,time和datetime当然能实现,但操作逻辑会相对复杂;而schedule就是可以直接解决定时功能,代码比较简单,这是我们选择schedule的原因。

schedule用于定期作业的进程内调度程序,使用生成器模式进行配置。 通过Schedule,您可以使用简单,人性化的语法按预定的时间间隔定期运行Python函数

特点

  • 一种易于使用的API,用于调度作业。

  • 非常轻巧,没有外部依赖性。

  • 出色的测试覆盖率。

  • 在Python 2.7、3.5和3.6上测试过。

7.8.1. 实例

每天上午定时启动下面任务, 读取一个csv文档,随机读取一行内容 将内容发邮件到自己的邮箱。

首先涉及到定时任务,指定时间启动一个任务。 我们可以把这个任务定义为自定义函数,到时间调用这个函数即可,涉及到自定义函数。 其次是读取csv文档,csv其实相当于文本文档,然后读取这个文档有多少行,涉及到 Python 对文档的读写的知识。 然后生成一个行数以内的随机数,读取这个随机数所在行的内容保存到变量,涉及到 Python 列表的知识。 把这个变量的内容发送到自己的邮箱,这个就是Python 使用 yagmail 模块发送邮件的内容了。

>>> import schedule,time,random,yagmail
>>> from cfg import email_cfg
>>> def job():
>>>     with open('demo.csv', 'r') as f:
>>>         text = f.readlines()
>>>         count = len(text)
>>>         news_line = random.randint(1,count-1)
>>>         news = text[news_line]
>>>     yag = yagmail.SMTP(user=email_cfg['user'],password=email_cfg['passwd'] ,host='smtp.163.com')
>>>     contents = [news]
>>>     yag.send('alanyers@163.com','这是一条新闻',contents)

定义为每天运行已定义好的任务。并设置一个循环每分钟检查运行的状态。

>>> schedule.every().day.at('10:05').do(job)
>>> while True:
>>>     schedule.run_pending()
>>>     time.sleep(1)

schedule 执行定时任务的格式:

首先我们要定义一个函数,把想要执行的任务放进去

>>> def job():
>>>     print('时间到了')

每10分钟执行一次任务

>>> import schedule
>>> schedule.every(10).minutes.do(job)
Every 10 minutes do job() (last run: [never], next run: 2020-08-19 10:39:56)

每小时执行一次任务

>>> schedule.every().hour.do(job)
Every 1 hour do job() (last run: [never], next run: 2020-08-19 11:07:12)

每天在什么时间点执行一次任务

>>> schedule.every().day.at('10:30').do(job)
Every 1 day at 10:30:00 do job() (last run: [never], next run: 2020-08-19 10:30:00)

每5-10分钟(随机)执行一次任务

>>> schedule.every(5).to(10).minutes.do(job)
Every 5 to 10 minutes do job() (last run: [never], next run: 2020-08-19 10:17:13)

每周一执行一次任务

>>> schedule.every().monday.do(job)
Every 1 week do job() (last run: [never], next run: 2020-08-24 10:07:14)

每周一什么时间点执行一次任务

>>> schedule.every().minute.at(':17').do(job)
Every 1 minute at 00:00:17 do job() (last run: [never], next run: 2020-08-19 10:07:17)

在程序最后,设置一个循环,每分钟检查一下任务执行状态。

>>> while True:
>>>     schedule.run_pending()
>>>     time.sleep(1)