最新消息:20210816 当前crifan.com域名已被污染,为防止失联,请关注(页面右下角的)公众号

【已解决】Flask中新增Celery周期任务去定期更新Azure的token

Flask crifan 3147浏览 0评论

折腾:

【已解决】把微软Azure语言合成TTS集成到Flask本地环境

期间,需要去给已有实现了基本的:

获取ms的azure的token

并通过token去获取tts的语音

的flask中,新增celery的周期性的,定期的task任务,去刷新对应token

因为azure的token的有效期只有10分钟。

celery 周期性 任务

#celery#周期性任务 – Hochikong的Blog

异步任务神器 Celery | FunHacks

Django Celery定时任务和时间设置 – 杨仕航的博客

分布式任务队列Celery – 微店技术团队 – SegmentFault 思否

Periodic Tasks — Celery 4.1.0 documentation

Configuration and defaults — Celery 4.1.0 documentation

celery.bin.beat — Celery 4.1.0 documentation

First Steps with Celery — Celery 4.1.0 documentation

Configuration and defaults — Celery 4.1.0 documentation

现在有点不清楚:

flask中,配置celery的timezone,到底是:

timezone =xxx

还是

CELERY_TIMEZONE = xxx

flask celery timezone config

Configuration and defaults — Celery 4.1.0 documentation

解释的很清楚:

之前是:

CELERY_TIMEZONE

现在是:

timezone

去确认此处celery版本是:

<code>➜  naturlingRobotDemoServer git:(master) ✗ pipenv graph
celery==4.1.0
</code>

且4.0之后,也支持旧的写法:

CELERY_TIMEZONE

而此处,在Flask中的配置文件中,为了:

  • 保证配置定义名字不和已有Flask中其他配置混淆

  • 一般的通俗约定也是配置方面,最好用大写

    • 显得是定义,而不是变量

所以此处还是继续用旧的写法:

<code>CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = True
</code>

celery beat schedule timezone and execute time · Issue #4177 · celery/celery

celerybeat – celery beat timezone problems – Stack Overflow

python – Celery beat – different time zone per task – Stack Overflow

继续参考:

celery — Distributed processing — Celery 4.1.0 documentation

【总结】

然后用代码:

app.py

<code>
from celery import Celery


def refreshMsToken():
    """refresh microsoft azure token for later call tts api"""
    global app, log, gMsToken

    log.info("refreshMsToken: gMsToken=%s", gMsToken)
    ...


def getMsToken():
    """get ms azure token"""
    refreshMsToken()

celeryApp = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celeryApp.conf.update(app.config)
log.info("celeryApp=%s", celeryApp)


&lt;--------------------------------------
# Celery tasks
&lt;--------------------------------------

# @celeryApp.task()
@celeryApp.task
# @celeryApp.task(name=app.config[&quot;CELERY_TASK_NAME&quot;] + &quot;.deleteTmpAudioFile&quot;)
def deleteTmpAudioFile(filename):
    &quot;&quot;&quot;
        delete tmp audio file from filename
            eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
    &quot;&quot;&quot;
    global log

    log.info(&quot;deleteTmpAudioFile: filename=%s&quot;, filename)
    ...

@celeryApp.task
def celeryRefreshMsToken():
    &quot;&quot;&quot;celery&#039;s task: refreshMsToken&quot;&quot;&quot;
    refreshMsToken()

@celeryApp.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
    log.info(&quot;celerySetupPeriodicTasks: sender=%s&quot;, sender)

    sender.add_periodic_task(app.config[&quot;CELERY_REFRESH_MS_TOKEN_INTERVAL&quot;],
                             celeryRefreshMsToken.s(),
                             name=&quot;refresh ms Azure token every less than 10 minutes&quot;)

</code>

config.py

<code>
    # CELERY_TASK_NAME = "Celery_" + FLASK_APP_NAME
    # CELERY_BROKER_URL = "redis://localhost"
    CELERY_BROKER_URL = "redis://localhost:6379/0"
    # CELERY_RESULT_BACKEND = "redis://localhost:6379/0" # current not use result
    CELERY_DELETE_TMP_AUDIO_FILE_DELAY = 60 * 2 # two minutes

    # for periodical celery task
    CELERY_TIMEZONE = "Asia/Shanghai"
    CELERY_ENABLE_UTC = True
    # CELERY_REFRESH_MS_TOKEN_INTERVAL = 60 * 9  # 9 minutes (&lt; 10 minutes)
    # for debug
    CELERY_REFRESH_MS_TOKEN_INTERVAL = 20
</code>

去运行:

在分开的终端中运行:

<code>celery worker -A app.celeryApp --loglevel=DEBUG
</code>

和:

<code>celery beat -A app.celeryApp --loglevel=DEBUG
</code>

对应效果:

然后每隔20秒,beat进程会发送消息给worker:

<code>➜  naturlingRobotDemoServer git:(master) ✗ celery beat -A app.celeryApp --loglevel=DEBUG
[2018-05-23 17:52:29,941] INFO in app: app=&lt;Flask 'app'&gt;
[2018-05-23 17:52:29,943] INFO in app: api=&lt;flask_restful.Api object at 0x10db60ba8&gt;
[2018-05-23 17:52:29,947] INFO in app: celeryApp=&lt;Celery app at 0x10cba09e8&gt;
[2018-05-23 17:52:29,948] INFO in app: aiContext=&lt;DialogueManager.Context object at 0x10dadf518&gt;
…
</code>

[2018-05-23 17:52:30,814] INFO in app: celerySetupPeriodicTasks: sender=<Celery app at 0x10cba09e8>

celery beat v4.1.0 (latentcall) is starting.

__    –    … __   –        _

LocalTime -> 2018-05-23 17:52:30

Configuration ->

    . broker -> redis://localhost:6379/0

    . loader -> celery.loaders.app.AppLoader

    . scheduler -> celery.beat.PersistentScheduler

    . db -> celerybeat-schedule

    . logfile -> [stderr]@%DEBUG

    . maxinterval -> 5.00 minutes (300s)

[2018-05-23 17:52:30,892: DEBUG/MainProcess] Setting default socket timeout to 30

[2018-05-23 17:52:30,893: INFO/MainProcess] beat: Starting…

[2018-05-23 17:52:30,995: DEBUG/MainProcess] Current schedule:

<ScheduleEntry: refresh ms Azure token every less than 10 minutes app.celeryRefreshMsToken() <freq: 20.00 seconds>

<ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)>

[2018-05-23 17:52:30,995: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes

[2018-05-23 17:52:31,006: INFO/MainProcess] Scheduler: Sending due task refresh ms Azure token every less than 10 minutes (app.celeryRefreshMsToken)

[2018-05-23 17:52:31,026: DEBUG/MainProcess] beat: Synchronizing schedule…

[2018-05-23 17:52:31,059: DEBUG/MainProcess] app.celeryRefreshMsToken sent. id->dd150a9f-0271-41f7-8163-d97a47660f6c

[2018-05-23 17:52:31,061: DEBUG/MainProcess] beat: Waking up in 19.93 seconds.

[2018-05-23 17:52:50,997: INFO/MainProcess] Scheduler: Sending due task refresh ms Azure token every less than 10 minutes (app.celeryRefreshMsToken)

[2018-05-23 17:52:50,998: DEBUG/MainProcess] app.celeryRefreshMsToken sent. id->05a3eebf-ae01-44f3-9d8d-86fa422ca9c8

[2018-05-23 17:52:50,998: DEBUG/MainProcess] beat: Waking up in 19.99 seconds.

[2018-05-23 17:53:10,997: INFO/MainProcess] Scheduler: Sending due task refresh ms Azure token every less than 10 minutes (app.celeryRefreshMsToken)

[2018-05-23 17:53:10,999: DEBUG/MainProcess] app.celeryRefreshMsToken sent. id->ecc0af10-627f-4e6a-a54d-cf4c6c5e8331

[2018-05-23 17:53:10,999: DEBUG/MainProcess] beat: Waking up in 19.99 seconds.

然后worker收到消息后,会去执行任务:

<code>[2018-05-23 18:04:34,279] INFO in app: celerySetupPeriodicTasks: sender=&lt;Celery app at 0x106d559e8&gt;
[2018-05-23 18:04:34,392: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2018-05-23 18:04:34,396: DEBUG/MainProcess] | Worker: Building graph...
[2018-05-23 18:04:34,397: DEBUG/MainProcess] | Worker: New boot order: {StateDB, Timer, Hub, Pool, Autoscaler, Beat, Consumer}
[2018-05-23 18:04:34,418: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2018-05-23 18:04:34,418: DEBUG/MainProcess] | Consumer: Building graph...
[2018-05-23 18:04:34,441: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Agent, Mingle, Tasks, Control, Gossip, event loop}


[email protected] v4.1.0 (latentcall)

Darwin-17.5.0-x86_64-i386-64bit-PE 2018-05-23 18:04:34

[config]
.&gt; app:         app:0x106d559e8
.&gt; transport:   redis://localhost:6379/0
.&gt; results:     disabled://
.&gt; concurrency: 4 (prefork)
.&gt; task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.&gt; celery           exchange=celery(direct) key=celery


[tasks]
  . app.celeryRefreshMsToken
  . app.deleteTmpAudioFile
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap

[2018-05-23 18:04:34,504: DEBUG/MainProcess] | Worker: Starting Hub
[2018-05-23 18:04:34,506: DEBUG/MainProcess] ^-- substep ok
[2018-05-23 18:04:34,506: DEBUG/MainProcess] | Worker: Starting Pool
[2018-05-23 18:04:34,789: DEBUG/MainProcess] ^-- substep ok
[2018-05-23 18:04:34,791: DEBUG/MainProcess] | Worker: Starting Consumer
[2018-05-23 18:04:34,792: DEBUG/MainProcess] | Consumer: Starting Connection
[2018-05-23 18:04:34,838: INFO/MainProcess] Connected to redis://localhost:6379/0
[2018-05-23 18:04:34,839: DEBUG/MainProcess] ^-- substep ok
[2018-05-23 18:04:34,839: DEBUG/MainProcess] | Consumer: Starting Events
[2018-05-23 18:04:34,864: DEBUG/MainProcess] ^-- substep ok
[2018-05-23 18:04:34,864: DEBUG/MainProcess] | Consumer: Starting Heart
[2018-05-23 18:04:34,870: DEBUG/MainProcess] ^-- substep ok
[2018-05-23 18:04:34,870: DEBUG/MainProcess] | Consumer: Starting Mingle
[2018-05-23 18:04:34,870: INFO/MainProcess] mingle: searching for neighbors
[2018-05-23 18:04:35,904: INFO/MainProcess] mingle: all alone
[2018-05-23 18:04:35,904: DEBUG/MainProcess] ^-- substep ok
[2018-05-23 18:04:35,905: DEBUG/MainProcess] | Consumer: Starting Tasks
[2018-05-23 18:04:35,910: DEBUG/MainProcess] ^-- substep ok
[2018-05-23 18:04:35,910: DEBUG/MainProcess] | Consumer: Starting Control
[2018-05-23 18:04:35,914: DEBUG/MainProcess] ^-- substep ok
[2018-05-23 18:04:35,915: DEBUG/MainProcess] | Consumer: Starting Gossip
[2018-05-23 18:04:35,919: DEBUG/MainProcess] ^-- substep ok
[2018-05-23 18:04:35,919: DEBUG/MainProcess] | Consumer: Starting event loop
[2018-05-23 18:04:35,919: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2018-05-23 18:04:35,920: INFO/MainProcess] [email protected] ready.
[2018-05-23 18:04:35,920: DEBUG/MainProcess] basic.qos: prefetch_count-&gt;16
[2018-05-23 18:04:42,933: INFO/MainProcess] Received task: app.celeryRefreshMsToken[3ab5b8a8-52a7-449f-8f87-871c77b525a1]
[2018-05-23 18:04:42,935: DEBUG/MainProcess] TaskPool: Apply &lt;function _fast_trace_task at 0x108649268&gt; (args:('app.celeryRefreshMsToken', '3ab5b8a8-52a7-449f-8f87-871c77b525a1', {'lang': 'py', 'task': 'app.celeryRefreshMsToken', 'id': '3ab5b8a8-52a7-449f-8f87-871c77b525a1', 'eta': None, 'expires': None, 'group': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '3ab5b8a8-52a7-449f-8f87-871c77b525a1', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': '[email protected]', 'reply_to': '3b8e4f1a-e52b-352e-b301-07fe3e100c15', 'correlation_id': '3ab5b8a8-52a7-449f-8f87-871c77b525a1', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}}, b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
[2018-05-23 18:04:42,952: WARNING/ForkPoolWorker-1] [2018-05-23 18:04:42,949] INFO in app: refreshMsToken: gMsToken=eyJxxxzvb8
[2018-05-23 18:04:42,954: DEBUG/MainProcess] Task accepted: app.celeryRefreshMsToken[3ab5b8a8-52a7-449f-8f87-871c77b525a1] pid:28371
[2018-05-23 18:04:42,949: INFO/ForkPoolWorker-1] refreshMsToken: gMsToken=eyxxxb8
[2018-05-23 18:04:42,957: WARNING/ForkPoolWorker-1] [2018-05-23 18:04:42,956] INFO in app: getMsTokenUrl=https://westus.api.cognitive.microsoft.com/sts/v1.0/issueToken, reqHeaders={'Ocp-Apim-Subscription-Key': '2240b6cd8e9c4e87b194647732bef8a2'}
[2018-05-23 18:04:42,956: INFO/ForkPoolWorker-1] getMsTokenUrl=https://westus.api.cognitive.microsoft.com/sts/v1.0/issueToken, reqHeaders={'Ocp-Apim-Subscription-Key': '2240b6cd8e9c4e87b194647732bef8a2'}
[2018-05-23 18:04:42,973: DEBUG/ForkPoolWorker-1] Starting new HTTPS connection (1): westus.api.cognitive.microsoft.com
[2018-05-23 18:04:43,989: DEBUG/ForkPoolWorker-1] https://westus.api.cognitive.microsoft.com:443 "POST /sts/v1.0/issueToken HTTP/1.1" 200 517
[2018-05-23 18:04:43,993: WARNING/ForkPoolWorker-1] [2018-05-23 18:04:43,993] INFO in app: resp=&lt;Response [200]&gt;
[2018-05-23 18:04:43,993: INFO/ForkPoolWorker-1] resp=&lt;Response [200]&gt;
[2018-05-23 18:04:43,994: WARNING/ForkPoolWorker-1] [2018-05-23 18:04:43,994] INFO in app: respTokenText=eyJ0eXAiOiJKV1QiLxxxxxGyQrH0
[2018-05-23 18:04:43,994: INFO/ForkPoolWorker-1] respTokenText=exxxxH0
[2018-05-23 18:04:43,995: WARNING/ForkPoolWorker-1] [2018-05-23 18:04:43,995] INFO in app: after refresh: gMsToken=eyJxxxrH0
[2018-05-23 18:04:43,995: INFO/ForkPoolWorker-1] after refresh: gMsToken=eyJxxx0
</code>

【后记1】

此处发现在当前文件夹中生成了文件:

celerybeat-schedule

需要去想办法,放到另外的,单独的文件夹中

celerybeat-schedule location

celerybeat-schedule file location

Periodic Tasks — Celery 4.1.0 documentation

“The default scheduler (storing the schedule in the celerybeat-schedule file) will automatically detect that the time zone has changed

Beat needs to store the last run times of the tasks in a local database file (named celerybeat-schedule by default), so it needs access to write in the current directory, or alternatively you can specify a custom location for this file:

$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule”

所以是加上-s参数,指定路径即可。

然后运行命令就换成:

<code>celery beat -A app.celeryApp -s runtime/celerybeat-schedule --loglevel=DEBUG
</code>

效果:

<code>[2018-05-24 10:33:37,733] INFO in app: celerySetupPeriodicTasks: sender=&lt;Celery app at 0x104f3e9e8&gt;
celery beat v4.1.0 (latentcall) is starting.
__    -    ... __   -        _
LocalTime -&gt; 2018-05-24 10:33:37
Configuration -&gt;
    . broker -&gt; redis://localhost:6379/0
    . loader -&gt; celery.loaders.app.AppLoader
    . scheduler -&gt; celery.beat.PersistentScheduler
    . db -&gt; runtime/celerybeat-schedule
    . logfile -&gt; [stderr]@%DEBUG
    . maxinterval -&gt; 5.00 minutes (300s)
[2018-05-24 10:33:37,792: DEBUG/MainProcess] Setting default socket timeout to 30
[2018-05-24 10:33:37,793: INFO/MainProcess] beat: Starting...
[2018-05-24 10:33:37,914: DEBUG/MainProcess] Current schedule:
&lt;ScheduleEntry: refresh ms Azure token every less than 10 minutes app.celeryRefreshMsToken() &lt;freq: 20.00 seconds&gt;
&lt;ScheduleEntry: celery.backend_cleanup celery.backend_cleanup() &lt;crontab: 0 4 * * * (m/h/d/dM/MY)&gt;
[2018-05-24 10:33:37,915: DEBUG/MainProcess] beat: Ticking with max interval-&gt;5.00 minutes
[2018-05-24 10:33:37,916: DEBUG/MainProcess] beat: Waking up in 19.93 seconds.
[2018-05-24 10:33:57,853: DEBUG/MainProcess] beat: Synchronizing schedule...
[2018-05-24 10:33:57,886: INFO/MainProcess] Scheduler: Sending due task refresh ms Azure token every less than 10 minutes (app.celeryRefreshMsToken)
[2018-05-24 10:33:57,907: DEBUG/MainProcess] app.celeryRefreshMsToken sent. id-&gt;7400d508-c8aa-4bdd-90d9-d5d0cfb6b504
[2018-05-24 10:33:57,911: DEBUG/MainProcess] beat: Waking up in 19.95 seconds.
</code>

其中有显示:

db -> runtime/celerybeat-schedule

然后对应的文件也放在了runtime文件夹中了:

【后记2】

后来在:

Calling Tasks — Celery 4.1.0 documentation

看到了:

“There’s another way…

You’ll learn more about this later while reading about the Canvas, but signature‘s are objects used to pass around the signature of a task invocation, (for example to send it over the network), and they also support the Calling API:

task.s(arg1, arg2, kwarg1=’x’, kwargs2=’y’).apply_async()”

->

celery — Distributed processing — Celery 4.1.0 documentation

“class celery.Signature(task=None, args=None, kwargs=None, options=None, type=None, subtask_type=None, immutable=False, app=None, **ex)[source]

Task Signature.”

才知道,原来前面的:

<code>    sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"],
                             celeryRefreshMsToken.s(),
                             name="refresh ms Azure token every less than 10 minutes")
</code>

中的:

<code>celeryRefreshMsToken.s()
</code>

可以写成:

<code>celeryRefreshMsToken.signature()
</code>

是表示函数签名的意思

然后此处等价于:

<code>celeryRefreshMsToken.apply_async()
</code>

转载请注明:在路上 » 【已解决】Flask中新增Celery周期任务去定期更新Azure的token

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
82 queries in 0.180 seconds, using 22.29MB memory