python - 以编程方式构建代理时,无法使用 webview 向浮士德代理/主题发送消息
问题描述
我正在尝试以编程方式构建代理,并且一直在关注这个示例。当我查看在faust -A <> agents
脚本开始时生成的主题和代理时,我可以看到它们。代理名称类似于{topic}_agent
示例。所有这些代理分别接触到一些 rest api。这是代码。
def create_agent(next_topic, model_metadata: Dict):
"""
creation of a single agent.
`start_topic`: str
Just a string that you can use in other functions
to figure out how messages in that topic can be
transformed
`next_topic`: faust.topics.Topic
A faust `app.topic` instance
"""
async def agent(stream):
"""Call domino model and return response
"""
async for message in stream:
domino_req = {"data": message.asdict()}
app.logger.info(domino_req)
response = requests.post(
model_metadata['url'],
auth=(
model_metadata['auth'],
model_metadata['auth'],
),
json=domino_req,
)
if response.status_code == 200:
answer = response.json()
await next_topic.send(answer['result'])
else:
app.logger.error(response.reason)
# app.logger.info(f"NEW Agent Created: ## Agent - {consumer} ##")
return agent
def agents(registry_response):
"""
configuration of multiple agents
"""
agents = []
for key, value in registry_response.items():
""" `topic`: app.topic instance """
agents.append(
# `topic.start`: str
# `topic.next`: faust.topics.Topic
(create_agent(next_topic= all_responses, model_metadata = value),key)
)
return agents
def attach_agent(agent, topic):
""" Attach the agent to the Faust app """
# `topic.faust`: faust.topics.Topic
# it is equivalent to `app.topic(topic.start)`
print("hello")
app.agent(channel=app.topic(topic), name=f"{topic}_agent")(agent)
# new_agent.start()
# app.logger.info("hello")
# app.logger.info(new_agent)
# app.logger.info(new_agent.info())
# app.logger.info(app.agents)
# @app.task
# async def get_model_registry():
# """
# Create topics and agents for the initial set of models present in the registry
# """
app.logger.info('APP STARTED')
app.logger.info('Fetching Models from Model Registry')
#TODO: Call the Model Registry and process it
#Just mocking the registry
registry_response = initial_model_registry_metadata
app.logger.info(f'Number of Models Found {len(registry_response)}')
for agent,topic in agents(registry_response):
attach_agent(agent, topic)
@app.page('/model/{key_ai_model}')
class frontdoor(View):
async def get(self, request: Request) -> Response:
return self.json({'key_ai_model': key_ai_model})
async def post(self, request: Request, key_ai_model: str) -> Response:
request_id = str(uuid.uuid4())
src = await request.json()
msg = GdeltRequest(**src)
app.logger.info(msg)
await gdelt_agent.cast(msg, key=request_id)
return self.json({'request_id': request_id, 'key_ai_model': key_ai_model})
@app.agent(all_responses)
async def print_responses(stream):
async for message in stream:
print(message)
但是,当我发布一些东西时,我收到了一个错误 - NameError: name 'gdelt_agent' is not defined
。这里的任何帮助将不胜感激。
解决方案
我已经能够通过将代理分配为字典中的值并稍后使用它将消息投射到代理来解决这个问题。
agents_dict = {}
topics_dict ={}
def attach_agent(agent, topic):
""" Attach the agent to the Faust app """
# `topic.faust`: faust.topics.Topic
# it is equivalent to `app.topic(topic.start)`
print("hello")
topics_dict[topic] = app.topic(topic)
agents_dict[topic] = app.agent(channel=topics_dict[topic], name=f"{topic}_agent")(agent)
agents_dict[topic].start()
谢谢大家。
推荐阅读
- c - 编译时出错:“二进制/的无效操作数(具有'short int *'和'int')”
- c++ - c++:为什么我不能为类“内部”的非常量静态成员赋值?
- visual-studio - MSBuild - 如何包含/排除目录?
- r - 无法在 R 中使用 rmarkdown 创建浮动目录
- angular - Angular Jquery Datatable 渲染按钮,单击事件不起作用
- solr - DSE 6.7 solr 搜索总是返回空
- powershell - PowerShell 访问按枚举逐项列出的多维数组
- office-js - Mac 上的 Outlook 2019 中缺少 SeriesId
- wordpress - 为什么将 SSL 域添加到 WordPress 多站点会导致:“潜在的安全风险”警告
- java - 根据集合中的值过滤列表中的值