django - 使用 Django 启动和停止定期后台任务
问题描述
我想用 Django 做一个比特币通知。如果设法让 Telegram 机器人在我要求他这样做时发送比特币统计数据。现在,如果比特币达到特定值,我希望他给我发消息。有一些教程在服务器上运行 python 脚本,但没有在 Django 上运行。我阅读了一些关于 django 频道的答案和描述,但无法使它们适应我的项目。
我想通过电报发送关于金额和持续时间的命令。然后,Django 将使用这些值和我在后台发送的通道的值启动一个进程。如果现在在持续时间内达到金额,Django 会向我的频道发送一条消息。这对于一个以上的人也应该是可能的。
这些可能与开箱即用的Django有关,可能与装饰器有关,还是我需要django-channels或其他东西?
编辑 2018-08-10: 也许我的代码更好地解释了我想要做什么。
import requests
import json
from datetime import datetime
from django.shortcuts import render
from django.http import HttpResponse
from django.conf import settings
from django.views.generic import TemplateView
from django.views.decorators.csrf
import csrf_exempt
class AboutView(TemplateView):
template_name = 'telapi/about.html'
bot_token = settings.BOT_TOKEN
def get_url(method):
return 'https://api.telegram.org/bot{}/{}'.format(bot_token, method)
def process_message(update):
data = {}
data['chat_id'] = update['message']['from']['id']
data['text'] = "I can hear you!"
r = requests.post(get_url('sendMessage'), data=data)
@csrf_exempt
def process_update(request, r_bot_token):
''' Method that is called from telegram-bot'''
if request.method == 'POST' and r_bot_token == bot_token:
update = json.loads(request.body.decode('utf-8'))
if 'message' in update:
if update['message']['text'] == 'give me news':
new_bitcoin_price(update)
else:
process_message(update)
return HttpResponse(status=200)
bitconin_api_uri = 'https://api.coinmarketcap.com/v2/ticker/1/?convert=EUR'
# response = requests.get(bitconin_api_uri)
def get_latest_bitcoin_price():
response = requests.get(bitconin_api_uri)
response_json = response.json()
euro_price = float(response_json['data']['quotes']['EUR']['price'])
timestamp = int(response_json['metadata']['timestamp'])
date = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
return euro_price, date
def new_bitcoin_price(update):
data = {}
data['chat_id'] = update['message']['from']['id']
euro_price, date = get_latest_bitcoin_price()
data['text'] = "Aktuel ({}) beträgt der Preis {:.2f}€".format(
date, euro_price)
r = requests.post(get_url('sendMessage'), data=data)
编辑 2018-08-13: 我认为解决方案是 celery-beat 和频道。有谁知道好的教程吗?
解决方案
我有同样的问题,有几种典型的方法:Celery、Django-Channels 等。但是你可以用简单的方法来避免它们:https ://docs.djangoproject.com/en/2.1/howto/custom-management-commands /
我在我的项目中使用了 django 命令来定期运行任务来重建用户统计信息:
- 实现自己的应用程序命令,例如您的应用程序名称是
myapp
并且您已放置my_periodic_task.py
在myapp/management/commands
文件夹中,因此您可以通过键入运行您的任务一次python manage.py my_periodic_task
- 放置在文件新文件旁边
manage.py
,例如background.py
使用相同的代码:
-
import os
from subprocess import call
BASE = os.path.dirname(__file__)
MANAGE_BASE = os.path.join(BASE, 'manage.py')
while True:
sleep(YOUR_TIMEOUT)
call(['python', MANAGE_BASE , 'my_periodic_task'])
- 例如运行你的服务器:
python background.py & python manage.py runserver 0.0.0.0:8000
推荐阅读
- python - 在 csv 文件中查找 Word 并使用循环实现它
- amazon-web-services - 多实例cpuutilization告警,数据不足
- javascript - preventDefault() 不适用于单击事件处理程序上的 window.location.hash
- javascript - 如何从动态元素中获取文本?(禁用/启用 Javascript?)
- gnuplot - gnuplot:如何在图形中插入图像?
- git - 还原合并提交后解决合并冲突
- sql - SQL SERVER 中 Oracle 异常类型的等价物是什么
- android - 在自定义视图中设置字体(字体)可防止视图在 Android Studio 的 xml 布局中显示
- c# - .NET 5 项目不能真正使用旧的第 3 方 .NET Framework 2.x/4.x 程序集
- php - 在 Kotlin 中实现动态属性分配