首页 > 解决方案 > “NameError: name 'await' ...”当我尝试将 python 脚本放入 CentOS systemd 服务时

问题描述

以下代码在 Jupyter 上正常工作,但我不知道为什么当我尝试将其投入使用时它不起作用systemd

#cdn_crawl_mongo_gavahi_aati
import requests
import json
import asyncio
import websockets
import urllib
import random
import time
import datetime
import pandas as pd
from threading import Thread
import numpy as np
import matplotlib.pyplot as plt
import pymongo

myclient = pymongo.MongoClient("mongodb://localhost:27017/", username='mongo-admin', password='password')
mydb = myclient["admin"]

pd.set_option('display.max_columns', None)  # or 1000
pd.set_option('display.max_rows', None)

path_data_future = '/root/data_crwl/cdn/future_contract/'
#zaferan = ['SAFOR99', 'SAFTR99', 'SAFSH99']
zaferan = ['SAFTR99', 'SAFSH99']
zaferan_govahi = ['IRK1A0389983', 'IRK1A0449985']


def wrt_df_csv(dataframe, data_type = 'tahvil'):
    # filename            
    data_file_time_stamp = str(datetime.datetime.now()).replace(' ','--') + str(datetime.datetime.now().timestamp())
    data_file_time_stamp = data_file_time_stamp + '.csv'
    data_file_time_stamp = data_file_time_stamp.replace(':',"-")
    #write to file
    print(len(dataframe))
    if data_type == 'tahvil':
        data_file_time_stamp = path_data_future+ 'tahvil/' + data_file_time_stamp
    else:
        data_file_time_stamp = path_data_future+ 'govahi/' + data_file_time_stamp
    dataframe.to_csv(data_file_time_stamp, sep=',', encoding='utf-16')
    print(data_file_time_stamp)
    return 

def pars_dataframe(data_crawl):
    dataframe_initial = pd.DataFrame(columns=data_column_names)
    for i in range(len(data_crawl)):  
        if len(data_crawl[i][1])>0:
            item_data = json.loads(data_crawl[i][1].replace('\n',''))
            for item in item_data:
                if 'ContractCode' in item.keys() and item['ContractCode'] in zaferan:
                    data_ptem = pd.DataFrame([item])
                    dataframe_initial = pd.concat([dataframe_initial, data_ptem])
    return dataframe_initial

def pars_dataframe_govahi(data_crawl):
    dataframe_initial = pd.DataFrame(columns=data_column_names)
    for i in range(len(data_crawl)):  
        if len(data_crawl[i][1])>0:
            item_data = json.loads(data_crawl[i][1].replace('\n',''))
            for item in item_data:
                if 'Code' in item.keys() and item['Code'] in zaferan_govahi:
                    data_ptem = pd.DataFrame([item])
                    dataframe_initial = pd.concat([dataframe_initial, data_ptem])

    return dataframe_initial

def wrt_to_file(data_crwl, data_type = 'tahvil'):
    try:
        #print('write_to_file')
        # parse data to write in dataframe
        data_df = pars_dataframe(data_crwl)
        #write to file
        wrt_df_csv(data_df, data_type)
    except:
        pass  
    return 

str_column_names = ',AskPrice1,AskPrice2,AskPrice3,AskPrice4,AskPrice5,AskTotalVolume,AskVolume1,AskVolume2,AskVolume3,AskVolume4,AskVolume5,AverageTradedPrice,BidPrice1,BidPrice2,BidPrice3,BidPrice4,BidPrice5,BidTotalVolume,BidVolume1,BidVolume2,BidVolume3,BidVolume4,BidVolume5,ClosingPrice,ContractCode,ContractCurrencyDecimalPlaces,ContractCurrencyEnDesc,ContractCurrencyFaDesc,ContractDescription,ContractID,ContractSize,ContractSizeUnitEnDesc,ContractSizeUnitFaDesc,Expired,FirstTradedPrice,FirstTradedPriceChanges,FirstTradedPriceChangesPercent,FirstTradedPriceTime,HighTradedPrice,HighTradedPriceChanges,HighTradedPriceChangesPercent,InitialMargin,LastSettlementPrice,LastSettlementPriceDate,LastTradedPrice,LastTradedPriceChanges,LastTradedPriceChangesPercent,LastTradedPriceTime,LastTradingDate,LastUpdate,LowTradedPrice,LowTradedPriceChanges,LowTradedPriceChangesPercent,MaintenanceMargin,OpenInterests,OpenInterestsChanges,OpenInterestsChangesPercent,OpeningPrice,OrdersDateTime,PersianLastSettlementPriceDate,PersianOrdersDateTime,PesrsianLastTradingDate,TradesCount,TradesValue,TradesValueCurrencyEnDesc,TradesValueCurrencyFaDesc,TradesVolume,YesterdayOpenInterests'
data_column_names = str_column_names.split(',')

diagram_col = ['LastTradedPrice','LastTradedPriceTime']

connectionData_govahi = [{"name":"gavahihub"}]

async def websocketConnect_govahi():
    def startReceiving(arg):
        r = ''
        while r == '':
            try:
                r = requests.get("https://cdn.ime.co.ir/realTimeServer/start", params = wsParams)
                break
            except:
                print("Connection refused by the server..")
                #print("Let me sleep for 3 seconds")
                #print("ZZzzzz...")
                time.sleep(3)
                print("Was a nice sleep, now let me continue...")
                continue

    r = ''
    while r == '':
        try:
            r = requests.get("https://cdn.ime.co.ir/realTimeServer/negotiate", params = {
            "clientProtocol": "2.1",
            "connectionData": json.dumps(connectionData_govahi),
            })
            break
        except:
            print("Connection refused by the server..")
            #print("Let me sleep for 3 seconds")
            #print("ZZzzzz...")
            time.sleep(3)
            print("Was a nice sleep, now let me continue...")
            continue

    response = r.json()

    #print(response)
    #print(f'got connection token : {response["ConnectionToken"]}')

    wsParams = {
        "transport": "webSockets",
        "clientProtocol": "2.1",
        "connectionToken": response["ConnectionToken"],
        "connectionData": json.dumps(connectionData_govahi),
        "tid": random.randint(0,9)
    }

    websocketUri = f"wss://cdn.ime.co.ir/realTimeServer/connect?{urllib.parse.urlencode(wsParams)}"
    result = []
    global data_govahi
    #crawled_item = []
    async with websockets.connect(websocketUri, ping_interval=None) as websocket:

        #print(f'started websocket')
        thread = Thread(target = startReceiving, args = (0, ))
        thread.start()
        for i in range(0,5):
            #print("receiving")
            try:
                data = await websocket.recv()

                jsonData = json.loads(data)
                if ("M" in jsonData and len(jsonData["M"]) > 0 and "A" in jsonData["M"][0] and len(jsonData["M"][0]["A"]) > 0):
                    items = jsonData["M"][0]["A"][0]
                    if type(items) == list and len(items) > 0: 
                        result = items
                        #crawled_item = items
                        break
            except:
                print('Reconnecting')
                websocket = await websockets.connect(websocketUri, ping_interval=None)
        thread.join()

        #data_dic.append([datetime.datetime.now(), json.dumps(crawled_item, indent=4, sort_keys=True)])
        data_govahi.append([datetime.datetime.now(), json.dumps(result, indent=4, sort_keys=True)])
        #wrt_to_file([[datetime.datetime.now(), json.dumps(result, indent=4, sort_keys=True)]])

    #print('data_govahi')
    #print(len(data_govahi))
    return data_govahi

connectionData = [{"name":"marketshub"}]

async def websocketConnect():

    def startReceiving(arg):
        r = ''
        while r == '':
            try:
                r = requests.get("https://cdn.ime.co.ir/realTimeServer/start", params = wsParams)
                break
            except:
                print("Connection refused by the server..")
                #print("Let me sleep for 3 seconds")
                #print("ZZzzzz...")
                time.sleep(3)
                print("Was a nice sleep, now let me continue...")
                continue

    r = ''
    while r == '':
        try:
            r = requests.get("https://cdn.ime.co.ir/realTimeServer/negotiate", params = {
            "clientProtocol": "2.1",
            "connectionData": json.dumps(connectionData),
            })
            break
        except:
            print("Connection refused by the server..")
            #print("Let me sleep for 3 seconds")
            #print("ZZzzzz...")
            time.sleep(3)
            print("Was a nice sleep, now let me continue...")
            continue

    response = r.json()

    #print(response)
    #print(f'got connection token : {response["ConnectionToken"]}')

    wsParams = {
        "transport": "webSockets",
        "clientProtocol": "2.1",
        "connectionToken": response["ConnectionToken"],
        "connectionData": json.dumps(connectionData),
        "tid": random.randint(0,9)
    }

    websocketUri = f"wss://cdn.ime.co.ir/realTimeServer/connect?{urllib.parse.urlencode(wsParams)}"
    result = []
    global data_dic
    async with websockets.connect(websocketUri, ping_interval=None) as websocket:

        #print(f'started websocket')
        thread = Thread(target = startReceiving, args = (0, ))
        thread.start()
        for i in range(0,5):
            #print("receiving")
            try:
                data = await websocket.recv()

                jsonData = json.loads(data)
                if ("M" in jsonData and len(jsonData["M"]) > 0 and "A" in jsonData["M"][0] and len(jsonData["M"][0]["A"]) > 0):
                    items = jsonData["M"][0]["A"][0]
                    if type(items) == list and len(items) > 0: 
                        result = items
                        #crawled_item = items
                        break
            except:
                print('Reconnecting')
                websocket = await websockets.connect(websocketUri, ping_interval=None)
        thread.join()

        data_dic.append([datetime.datetime.now(), json.dumps(result, indent=4, sort_keys=True)])
    return data_dic

def scrap_data():
    asyncio.ensure_future(websocketConnect())
    #asyncio.run(websocketConnect())
    #client.send_message(data_dic, embed=e)
    #print(len(data_dic))
    return 

def wrt_to_mongo(data_type = 'tahvil'):
    global data_dic
    global data_govahi
    try:
        if data_type=='tahvil':
            data_dict_mongo = pars_dataframe_mongo(data_dic, data_type)   
        else:
            data_dict_mongo = pars_dataframe_mongo_govahi(data_govahi, data_type)  
        #print(data_dict_mongo)
        myclient = pymongo.MongoClient("mongodb://localhost:27017/", username='mongo-admin', password='password')
        mydb = myclient["admin"]
        posts = mydb.posts
        for item in data_dict_mongo:
            if '_id' in item.keys():
                del item['_id']
            post_id = posts.insert_one(item).inserted_id
        if data_type=='tahvil':
            data_dic = []  
        else:
            data_govahi = []
        print('write to mongo_db')
    except:
        print('unable to write on mongo_db')
        pass  
    return 

def pars_dataframe_mongo_govahi(data_crawl, data_type):
    data_dict_mongo = []
    for i in range(len(data_crawl)):  
        if len(data_crawl[i][1])>0:
            item_data = json.loads(data_crawl[i][1].replace('\n',''))
            for item in item_data:
                if 'Code' in item.keys() and item['Code'] in zaferan_govahi:
                    item['db_date'] = datetime.datetime.now()
                    item['type'] = 'govahi'
                    data_dict_mongo.append(item)
    return data_dict_mongo

def pars_dataframe_mongo(data_crawl, data_type):
    data_dict_mongo = []
    for i in range(len(data_crawl)):  
        if len(data_crawl[i][1])>0:
            item_data = json.loads(data_crawl[i][1].replace('\n',''))
            for item in item_data:
                if 'ContractCode' in item.keys() and item['ContractCode'] in zaferan:                    
                    item['db_date'] = datetime.datetime.now()
                    item['type'] = 'tahvil'
                    data_dict_mongo.append(item)
    return data_dict_mongo

mat = np.array([])
mat_2 = np.array([])
mat_g = np.array([])
mat_g_2 = np.array([])

data_dic  = []  
data_govahi  = []  



while(True):

    await (websocketConnect())
    await (websocketConnect_govahi())

    data_pars = pars_dataframe([data_dic[-1]])
    if len(data_pars) > 0:
        mat_append = np.array(data_pars.loc[data_pars['ContractCode'] == zaferan[0]]['LastTradedPrice'])
        mat_append_2 = np.array(data_pars.loc[data_pars['ContractCode'] == zaferan[1]]['LastTradedPrice'])
        if len(mat) == 0:
            mat = mat_append
        if len(mat_2) == 0:
            mat_2 = mat_append_2
        mat = np.append(mat, mat_append)
        mat_2 = np.append(mat_2, mat_append_2)

    data_pars_g = pars_dataframe_govahi([data_govahi[-1]])
    if len(data_pars_g) > 0:
        mat_append_g = np.array(data_pars_g.loc[data_pars_g['Code'] == 'IRK1A0389983']['FinalPrice'])
        mat_append_g_2 = np.array(data_pars_g.loc[data_pars_g['Code'] == 'IRK1A0449985']['FinalPrice'])
        if len(mat_g) == 0:
            mat_g = mat_append_g
        if len(mat_g_2) == 0:
            mat_g_2 = mat_append_g_2
        mat_g = np.append(mat_g, mat_append_g)
        mat_g_2 = np.append(mat_g_2, mat_append_g_2)

    print('...')
    print(len(data_dic))
    print(len(data_govahi))
    if len(data_dic)>2:
        wrt_to_mongo('tahvil')

    if len(data_govahi)>2:
        wrt_to_mongo('govahi')

我运行时得到的完整错误systemctl status python-script是:

   Loaded: loaded (/etc/systemd/system/python-script.service; enabled; vendor preset: disabled)
   Active: failed (Result: exit-code) since Sat 2020-05-16 01:11:50 +0430; 15h ago
 Main PID: 854 (code=exited, status=1/FAILURE)

May 16 01:11:43 boiga.server1.more.com systemd[1]: Started Python Script Serv...
May 16 01:11:50 boiga.server1.more.com python3[854]: Traceback (most recent c...
May 16 01:11:50 boiga.server1.more.com python3[854]: File "/root/script.py", ...
May 16 01:11:50 boiga.server1.more.com python3[854]: await (websocketConnect())
May 16 01:11:50 boiga.server1.more.com python3[854]: NameError: name 'await' ...
May 16 01:11:50 boiga.server1.more.com systemd[1]: python-script.service: mai...
May 16 01:11:50 boiga.server1.more.com systemd[1]: Unit python-script.service...
May 16 01:11:50 boiga.server1.more.com systemd[1]: python-script.service failed.
Hint: Some lines were ellipsized, use -l to show in full.

编辑:这是我为我的脚本创建的服务:

[Unit]
Description=Python Script Service
After=network.target

[Service]
Type=simple
User=root
ExecStart=/usr/bin/python3 /root/script.py
Restart=on-abort


[Install]
WantedBy=multi-user.target

EDIT2:我在我的服务代码中更改python3python,但我得到的新错误是:

● python-script.service - Python Script Service
   Loaded: loaded (/etc/systemd/system/python-script.service; enabled; vendor preset: disabled)
   Active: failed (Result: exit-code) since Sun 2020-05-17 17:52:36 +0430; 5min ago
  Process: 850 ExecStart=/usr/bin/python /root/script.py (code=exited, status=1/FAILURE)
 Main PID: 850 (code=exited, status=1/FAILURE)

May 17 17:52:36 boiga.server1.more.com systemd[1]: Started Python Script Service.
May 17 17:52:36 boiga.server1.more.com python[850]: File "/root/script.py", line 84
May 17 17:52:36 boiga.server1.more.com python[850]: async def websocketConnect_govahi():
May 17 17:52:36 boiga.server1.more.com python[850]: ^
May 17 17:52:36 boiga.server1.more.com python[850]: SyntaxError: invalid syntax
May 17 17:52:36 boiga.server1.more.com systemd[1]: python-script.service: main process exited, code=exited, status=1/FAILURE
May 17 17:52:36 boiga.server1.more.com systemd[1]: Unit python-script.service entered failed state.
May 17 17:52:36 boiga.server1.more.com systemd[1]: python-script.service failed.

标签: pythonserviceasync-awaitnameerror

解决方案


推荐阅读