python - 如何使用 pytest 测试烧瓶应用程序?
问题描述
嘿伙计们,我是单元测试的新手,老实说,即使遵循 The Testing Flask Applications 文档https://flask .palletsprojects.com/en/1.1.x/testing/
我正在建立一个幻想交易股票,我需要对交易进行单元测试。但是,我正在按照测试烧瓶应用程序文档连接 pytest 并开始测试 status_code == 200 如果用户导航到所述路由或发出请求(发布、获取等)。下面是我的结构服务器目录我有任何资源或想法如何开始测试路线或运行 pytest。我会留下每次运行 test_transactions.py 时遇到的错误
注意:我的服务器目录是 Package Structured
错误:
========================================================== FAILURES ==========================================================
______________________________________________________ test_transaction ______________________________________________________
client = <FlaskClient <Flask 'server_files'>>
def test_transaction(client):
> landing = client.get('http://localhost:5000/multiple_stocks')
test_transactions.py:13:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
env\lib\site-packages\werkzeug\test.py:1006: in get
return self.open(*args, **kw)
env\lib\site-packages\flask\testing.py:222: in open
return Client.open(
env\lib\site-packages\werkzeug\test.py:970: in open
response = self.run_wsgi_app(environ.copy(), buffered=buffered)
env\lib\site-packages\werkzeug\test.py:861: in run_wsgi_app
rv = run_wsgi_app(self.application, environ, buffered=buffered)
env\lib\site-packages\werkzeug\test.py:1096: in run_wsgi_app
app_rv = app(environ, start_response)
env\lib\site-packages\flask\app.py:2464: in __call__
return self.wsgi_app(environ, start_response)
env\lib\site-packages\flask\app.py:2450: in wsgi_app
response = self.handle_exception(e)
env\lib\site-packages\flask\app.py:1867: in handle_exception
reraise(exc_type, exc_value, tb)
env\lib\site-packages\flask\_compat.py:39: in reraise
raise value
env\lib\site-packages\flask\app.py:2447: in wsgi_app
response = self.full_dispatch_request()
env\lib\site-packages\flask\app.py:1952: in full_dispatch_request
rv = self.handle_user_exception(e)
env\lib\site-packages\flask\app.py:1821: in handle_user_exception
reraise(exc_type, exc_value, tb)
env\lib\site-packages\flask\_compat.py:39: in reraise
raise value
env\lib\site-packages\flask\app.py:1950: in full_dispatch_request
rv = self.dispatch_request()
env\lib\site-packages\flask\app.py:1936: in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
server_files\routes.py:31: in multiple
resp = req.json()
env\lib\site-packages\requests\models.py:900: in json
return complexjson.loads(self.text, **kwargs)
C:\Python39\lib\json\__init__.py:346: in loads
return _default_decoder.decode(s)
C:\Python39\lib\json\decoder.py:337: in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <json.decoder.JSONDecoder object at 0x00000238C4AF11C0>, s = 'The API key provided is not valid.', idx = 0
def raw_decode(self, s, idx=0):
"""Decode a JSON document from ``s`` (a ``str`` beginning with
a JSON document) and return a 2-tuple of the Python
representation and the index in ``s`` where the document ended.
This can be used to decode a JSON document from a string that may
have extraneous data at the end.
"""
try:
obj, end = self.scan_once(s, idx)
except StopIteration as err:
> raise JSONDecodeError("Expecting value", s, err.value) from None
E json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
C:\Python39\lib\json\decoder.py:355: JSONDecodeError
================================================== short test summary info ===================================================
FAILED test_transactions.py::test_transaction - json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
===================================================== 1 failed in 0.42s ======================================================
server
_______ app.py
_______ server_files
_______ __init__.py
_______ models.py
_______routes.py
_______ test_transactions.py
应用程序.py
import os
import pytest
from server_files import app
if __name__ == '__main__':
app.run(host='127.0.0.1', port=int(os.environ.get("PORT", 5000)))
test_transactions.py
try:
import pytest
import requests
from server_files import client
# from server_files.routes import configure_app
# from server_files.routes import add_stoc
print("unittest line 5 {}".format(client))
except Exception as e:
print("Some Modules are missings {}".format(e))
def test_transaction(client):
landing = client.get('http://localhost:5000/multiple_stocks')
response = landing.data
assert response == 200
if __name__ == '__main__':
pytest.main()
初始化.py
import pytest
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_bcrypt import Bcrypt
from flask_login import LoginManager
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://postgres:1209lmc@localhost/fantasy_stock_app'
bcrypt = Bcrypt(app)
login_manager = LoginManager(app)
db = SQLAlchemy(app)
from server_files import routes
@pytest.fixture
def client():
# app.testing = True
app.config['TESTING'] = True
client = app.test_client()
return client
路线.py
import os
import json
import base64
import requests
from datetime import datetime
from flask import jsonify, request, render_template, redirect, url_for
from werkzeug.security import generate_password_hash, check_password_hash
from server_files import app, db, bcrypt
from server_files.models import Users, Transactions, Stock
api_key = os.environ.get('API_KEY')
base_url = "https://cloud.iexapis.com"
@app.route("/multiple_stocks", methods=["GET"])
def multiple():
tesla = "tsla"
apple = "aapl"
fb = "fb"
qcom = "qcom"
microsft = "msft"
sony = 'sne'
american_airline = "aal"
search_url = "{}/stable/stock/market/batch?symbols={},{},{},{},{},{},{}&types=quote&token={}".format(
base_url, tesla, apple, fb, qcom, microsft, sony, american_airline, api_key)
req = requests.get(search_url)
resp = req.json()
stocks_data = [
{
"company_name": resp['AAPL']["quote"]["companyName"],
"symbol": resp['AAPL']["quote"]["symbol"],
"latestPrice": resp['AAPL']["quote"]["latestPrice"],
"change": resp['AAPL']["quote"]["change"],
},
{
"company_name": resp['TSLA']["quote"]["companyName"],
"symbol": resp['TSLA']["quote"]["symbol"],
"latestPrice": resp['TSLA']["quote"]["latestPrice"],
"change": resp['TSLA']["quote"]["change"]
},
{
"company_name": resp['FB']["quote"]["companyName"],
"symbol": resp['FB']["quote"]["symbol"],
"latestPrice": resp['FB']["quote"]["latestPrice"],
"change": resp['FB']["quote"]["change"]
},
{
"company_name": resp['AAL']["quote"]["companyName"],
"symbol": resp['AAL']["quote"]["symbol"],
"latestPrice": resp['AAL']["quote"]["latestPrice"],
"change": resp['AAL']["quote"]["change"]
},
{
"company_name": resp['MSFT']["quote"]["companyName"],
"symbol": resp['MSFT']["quote"]["symbol"],
"latestPrice": resp['MSFT']["quote"]["latestPrice"],
"change": resp['MSFT']["quote"]["change"]
},
{
"company_name": resp['QCOM']["quote"]["companyName"],
"symbol": resp['QCOM']["quote"]["symbol"],
"latestPrice": resp['QCOM']["quote"]["latestPrice"],
"change": resp['QCOM']["quote"]["change"]
},
{
"company_name": resp['SNE']["quote"]["companyName"],
"symbol": resp['SNE']["quote"]["symbol"],
"latestPrice": resp['SNE']["quote"]["latestPrice"],
"change": resp['SNE']["quote"]["change"]
},
]
return jsonify({"data": stocks_data})
@app.route("/search_stock/<string:stock>", methods={'GET'})
def search_stock(stock):
search_url = "{}/stable/stock/{}/quote?token={}".format(
base_url, stock, api_key)
req = requests.get(search_url)
resp = req.json()
stock_data = {
"company_name": resp["companyName"],
"cost": resp["latestPrice"],
"change": resp["change"],
"symbol": resp["symbol"]
}
return jsonify({"data": stock_data})
@app.route('/add_stock', methods=['POST'])
def add_stock():
user_detail = request.get_json()
print(user_detail)
user = Users.query.filter_by(id=user_detail['id']).first()
if user:
filter_by_stock_symbol = Stock.query.filter_by(
stock_symbol=user_detail['stockSymbol']).first()
if filter_by_stock_symbol != None:
print("line 115 {}".format(filter_by_stock_symbol))
update_user_cost = (filter_by_stock_symbol.user_estimated_cost +
user_detail['estimatedCost'])
update_user_shares = (filter_by_stock_symbol.user_estimated_shares +
user_detail['estimatedShares'])
update_user_holdings = user.user_holdings - update_user_cost
filter_by_stock_symbol.user_estimated_cost = update_user_cost
filter_by_stock_symbol.user_estimated_shares = update_user_shares
user.user_holdings = update_user_holdings
print("line 125 {}".format(user.user_holdings))
transaction = Transactions(company_name=user_detail['company_name'], user_estimated_cost=user_detail['estimatedCost'],
user_holdings=user_detail['estimatedCost'], user_id=user_detail['id'])
db.session.add(transaction)
db.session.commit()
return "Success! Stock added to db", 200
else:
print("line 131 {}".format(filter_by_stock_symbol))
user_holdings = user.user_holdings - user_detail['estimatedCost']
user.user_holdings = user_holdings
"""
Transactions model column user_holdings
has to be the amount of $ the user currently has after
each transaction is complete.
Leaving as is, working on the client side first
"""
user_stock = Stock(company_name=user_detail['company_name'],
stock_symbol=user_detail['stockSymbol'], stock_cost=user_detail['stockCost'],
user_estimated_shares=user_detail['estimatedShares'], user_estimated_cost=user_detail['estimatedCost'], user_id=user_detail['id'])
transaction = Transactions(company_name=user_detail['company_name'], user_estimated_cost=user_detail[
'estimatedCost'], user_holdings=user_holdings, user_id=user_detail['id'])
db.session.add(user_stock)
db.session.add(transaction)
db.session.commit()
return jsonify("Success! Stock in db updated!", 200)
else:
return jsonify('Something went wrong on our end! Please try again later.', 500)
@app.route('/sell_stock', methods=["POST"])
def sell_stock():
user_detail = request.get_json()
user = Users.query.filter_by(id=user_detail['id']).first()
filter_by_stock = Stock.query.filter_by(
stock_symbol=user_detail['stockSymbol']).first()
search_stock = "{}/stable/stock/{}/quote?token={}".format(
base_url, user_detail['stockSymbol'], api_key)
req = requests.get(search_stock)
resp = req.json()
user_estimated_shares = filter_by_stock.user_estimated_shares
actual_stock_cost = resp['latestPrice']
user_selling_ammout = user_detail['userSellingAmount']
stock_bought_at = filter_by_stock.stock_cost
difference_in_cost = (actual_stock_cost -
stock_bought_at) * user_estimated_shares
difference_in_shares = (filter_by_stock.user_estimated_cost -
user_selling_ammout) / filter_by_stock.stock_cost
user_holdings = user.user_holdings + \
difference_in_cost + user_detail['userSellingAmount']
print("line 185 {}".format(user_holdings))
print("line 186 user.user_holdings{} user detail selling amount {}".format(
user.user_holdings, user_detail['userSellingAmount']))
if user:
filter_by_stock.user_estimated_cost = filter_by_stock.user_estimated_cost - \
user_selling_ammout
filter_by_stock.user_estimated_shares = difference_in_shares
user.user_holdings = user_holdings
if filter_by_stock.user_estimated_cost == 0:
transaction = Transactions(company_name=user_detail['companyName'], user_estimated_cost=user_detail[
'userSellingAmount'], user_holdings=user_holdings, user_id=user_detail['id'])
stock = Stock.query.filter_by(
stock_symbol=user_detail['stockSymbol']).delete()
db.session.add(transaction)
db.session.commit()
print("deleted stock {}".format(user_detail['stockSymbol']))
else:
print("updated stock {}".format(user_detail['stockSymbol']))
transaction = Transactions(company_name=user_detail['companyName'], user_estimated_cost=user_detail[
'userSellingAmount'], user_holdings=user_holdings, user_id=user_detail['id'])
db.session.add(transaction)
db.session.commit()
return "ok", 200
else:
return 'nope', 500
@app.route('/user_stock', methods=['POST'])
def user_stock():
user_detail = request.get_json()
user = Users.query.filter_by(id=user_detail['id']).first()
stock = Stock.query.filter_by(user_id=user_detail['id']).all()
stock_list = []
if user:
for data in stock:
search_url = "{}/stable/stock/{}/quote?token={}".format(
base_url, data.stock_symbol, api_key)
req = requests.get(search_url)
resp = req.json()
difference_in_cost = (
resp['latestPrice'] - data.stock_cost) * data.user_estimated_shares
print(resp['symbol'])
print(resp['latestPrice'])
print('difference in cost {}'.format(difference_in_cost))
stock_obj = {
"companyName": data.company_name,
"symbol": data.stock_symbol,
"cost": data.stock_cost,
"userEstimatedShares": data.user_estimated_shares,
"userEstimatedHolding": data.user_estimated_cost,
"differenceInCost": difference_in_cost
}
stock_list.append(stock_obj)
if stock_list != '':
return jsonify({"stock": stock_list})
else:
return jsonify("An issue has occurred on our end! Please try again later", 500)
else:
return jsonify('User not found in our record! You will be redirected to the home page.', 500)
@app.route('/signup', methods=["POST"])
def signup():
user_details = request.get_json()
filter_user_model_by_username = Users.query.filter_by(
username=user_details['username']).first()
hashed_password = bcrypt.generate_password_hash(
user_details['password']).decode('utf-8')
if filter_user_model_by_username is None:
user = Users(first_name=user_details['first_name'], last_name=user_details['last_name'],
email=user_details['email'], username=user_details['username'], password=hashed_password, user_holdings=user_details['userHoldings'])
db.session.add(user)
db.session.commit()
return jsonify("Success! You will be redirect to your account shortly!", user.id, 200)
else:
return jsonify("The username has already been used! Please choose another username!", 500)
@app.route('/login', methods=["POST"])
def login():
user_details = request.get_json()
user = Users.query.filter_by(username=user_details["username"]).first()
print(user.username)
if user and bcrypt.check_password_hash(user.password, user_details['password']):
response = {
"user_id": user.id,
"success_msg": "You are logged in successfully! You will be redirect to your account shortly!",
"username": user.username
}
return jsonify(response, 200)
else:
return jsonify("hmmm.. We don't recognize that username or password. Please try again!", 500)
@app.route('/user', methods=["POST"])
def user():
user_detail = request.get_json()
user = Users.query.filter_by(id=user_detail['id']).first()
user_obj = {
"username": user.username,
"user_holdings": user.user_holdings
}
if user:
return jsonify(user_obj)
else:
return "User not found!", 500
解决方案
推荐阅读
- java - 在 for 循环中使用 Hibernate save 方法将对象持久保存在数据库中的方法
- javascript - 来自 BackHandler 的 React Native setState 未按预期工作
- python - 无法 dlopen 库“libcudnn.so.7”;dlerror:libcudnn.so.7:LD_LIBRARY_PATH:/usr/local/cuda-10.0/lib64:
- python - 修改使用 django 框架创建的注册选项卡
- python - PermissionError(13, 'Permission denied') 从虚拟主机服务上的 Python 脚本导入熊猫时
- matlab - 在matlab中使用normrand进行高斯分布是真的吗
- c# - 如何在 startup.cs 中添加 /.well-known/openid-configuration 路由以及控制器路由和角度路由?
- bash - 如何订阅文件创建事件?(MacOS,Linux)
- java - 连接未关闭
- python - Discord py forward messages(with embeds) sent on one channel to another and how to determine number of members in a server?