python - 如何以编程方式关闭 dumpmaster (mitmproxy)?
问题描述
我正在使用 mitmproxy 来拦截来自我的移动设备的一些请求/响应。我有代码块来启动它,如下所示:
import asyncio
from mitmproxy import proxy, options, ctx
from mitmproxy.tools.dump import DumpMaster
from mitmproxy import http
class AdjustBody:
def response(self, flow: http.HTTPFlow) -> None:
if "google" in flow.request.url:
print("Before intercept: %s" % flow.response.text)
flow.response.content = bytes("This is replacement response", "UTF-8")
print("After intercept: %s" % flow.response.text)
def start():
add_on = AdjustBody()
opts = options.Options(listen_host='192.168.1.224', listen_port=8888, confdir="/Users/hienphan/.mitmproxy")
proxy_conf = proxy.config.ProxyConfig(opts)
dump_master = DumpMaster(opts)
dump_master.server = proxy.server.ProxyServer(proxy_conf)
dump_master.addons.add(add_on)
try:
asyncio.ensure_future(stop())
dump_master.run()
except KeyboardInterrupt:
dump_master.shutdown()
async def stop():
# Sleep 10s to do intercept
await asyncio.sleep(10)
ctx.master.shutdown()
start()
我可以正常启动它,但它是一个 run_forever() 事件循环。然后我不知道如何以编程方式停止它。我在这里尝试的只是在关闭它之前睡 10 秒做我想做的事。在关闭代理之前有什么方法可以等待我的拦截完成?
解决方案
import asyncio
import os
import signal
from mitmproxy import proxy, options
from mitmproxy.tools.dump import DumpMaster
from mitmproxy.http import HTTPFlow
class AddHeader:
def __init__(self):
self.num = 0
def response(self, flow: HTTPFlow):
self.num = self.num + 1
flow.response.headers["count"] = str(self.num)
addons = [
AddHeader()
]
opts = options.Options(listen_host='0.0.0.0', listen_port=8080)
pconf = proxy.config.ProxyConfig(opts)
m = DumpMaster(opts)
m.server = proxy.server.ProxyServer(pconf)
m.addons.add(*addons)
try:
loop = asyncio.get_event_loop()
try:
loop.add_signal_handler(signal.SIGINT, getattr(m, "prompt_for_exit", m.shutdown))
loop.add_signal_handler(signal.SIGTERM, m.shutdown)
except NotImplementedError:
# Not supported on Windows
pass
# Make sure that we catch KeyboardInterrupts on Windows.
# https://stackoverflow.com/a/36925722/934719
if os.name == "nt":
async def wakeup():
while True:
await asyncio.sleep(0.2)
asyncio.ensure_future(wakeup())
m.run()
except (KeyboardInterrupt, RuntimeError):
pass
推荐阅读
- python - For Loop Pandas 中的滚动过滤器
- google-sheets - GoogleSheet:从两个表中交叉数据
- java - 了解 Java 使用 Arrays.sort() 对二维数组进行排序
- django - 现有用户的 Django-allauth 手动链接
- flutter - 如何在子小部件中定义 Navigator.pop() 响应类型
- java - 如何在 mongo-template 中为 $in 聚合编写 mongo cli 查询
- discord.py - 当我们传递命名参数时出现 discord.py 错误
- python - 如何在不打开文件的情况下初始化 Python file_object
- github - 如何在 sonarqube 中分析来自 GitHub 存储库的代码
- ssl - 我可以生成我的公钥和私钥对,其中我的公钥是公开信任的(CA 认证)