首页 > 解决方案 > 使用 Django 启动和停止定期后台任务

问题描述

我想用 Django 做一个比特币通知。如果设法让 Telegram 机器人在我要求他这样做时发送比特币统计数据。现在,如果比特币达到特定值,我希望他给我发消息。有一些教程在服务器上运行 python 脚本,但没有在 Django 上运行。我阅读了一些关于 django 频道的答案和描述,但无法使它们适应我的项目。

我想通过电报发送关于金额和持续时间的命令。然后,Django 将使用这些值和我在后台发送的通道的值启动一个进程。如果现在在持续时间内达到金额,Django 会向我的频道发送一条消息。这对于一个以上的人也应该是可能的。

这些可能与开箱即用的Django有关,可能与装饰器有关,还是我需要django-channels或其他东西?

编辑 2018-08-10: 也许我的代码更好地解释了我想要做什么。

import requests
import json
from datetime import datetime

from django.shortcuts import render
from django.http import HttpResponse
from django.conf import settings

from django.views.generic import TemplateView
from django.views.decorators.csrf 
import csrf_exempt


class AboutView(TemplateView):
    template_name = 'telapi/about.html'


bot_token = settings.BOT_TOKEN


def get_url(method):
    return 'https://api.telegram.org/bot{}/{}'.format(bot_token, method)


def process_message(update):
    data = {}
    data['chat_id'] = update['message']['from']['id']
    data['text'] = "I can hear you!"
    r = requests.post(get_url('sendMessage'), data=data)


@csrf_exempt
def process_update(request, r_bot_token):
    ''' Method that is called from telegram-bot'''
    if request.method == 'POST' and r_bot_token == bot_token:
        update = json.loads(request.body.decode('utf-8'))
        if 'message' in update:
            if update['message']['text'] == 'give me news':
                new_bitcoin_price(update)
            else:
                process_message(update)
            return HttpResponse(status=200)


bitconin_api_uri = 'https://api.coinmarketcap.com/v2/ticker/1/?convert=EUR'
# response = requests.get(bitconin_api_uri)


def get_latest_bitcoin_price():
    response = requests.get(bitconin_api_uri)
    response_json = response.json()
    euro_price = float(response_json['data']['quotes']['EUR']['price'])
    timestamp = int(response_json['metadata']['timestamp'])
    date = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
    return euro_price, date


def new_bitcoin_price(update):
    data = {}
    data['chat_id'] = update['message']['from']['id']
    euro_price, date = get_latest_bitcoin_price()
    data['text'] = "Aktuel ({}) beträgt der Preis {:.2f}€".format(
        date, euro_price)
    r = requests.post(get_url('sendMessage'), data=data)

编辑 2018-08-13: 我认为解决方案是 celery-beat 和频道。有谁知道好的教程吗?

标签: djangopython-3.xtaskbotsdjango-channels

解决方案


我有同样的问题,有几种典型的方法:Celery、Django-Channels 等。但是你可以用简单的方法来避免它们:https ://docs.djangoproject.com/en/2.1/howto/custom-management-commands /

我在我的项目中使用了 django 命令来定期运行任务来重建用户统计信息:

  1. 实现自己的应用程序命令,例如您的应用程序名称是myapp并且您已放置my_periodic_task.pymyapp/management/commands文件夹中,因此您可以通过键入运行您的任务一次python manage.py my_periodic_task
  2. 放置在文件新文件旁边manage.py,例如background.py使用相同的代码:

-

import os
from subprocess import call

BASE = os.path.dirname(__file__)
MANAGE_BASE = os.path.join(BASE, 'manage.py')

while True:
    sleep(YOUR_TIMEOUT)
    call(['python', MANAGE_BASE , 'my_periodic_task'])
  1. 例如运行你的服务器:python background.py & python manage.py runserver 0.0.0.0:8000

推荐阅读