asynchronous - 如何在 Erlang 中进行并行调用并等待所有结果?
问题描述
我正在使用 Erlang 开发手机游戏后端。对于每个 HTTP 请求,它可能需要查询不同的数据源,例如 PostgreSQL、MongoDB 和 Redis。我想并行地对这些数据源进行独立调用,但找不到明确的 Erlang 方法。
例如,
handle_request(?POST, <<"login">>, UserId, Token) ->
% Verify token from PostgreSQL
AuthResult = auth_service:login(UserId, Token),
% Get user data such as outfits and level from MongoDB
UserDataResult = userdata_service:get(UserId),
% Get messages sent to the user from Redis
MessageResult = message_service:get(UserId),
% How to run the above 3 calls in parallel?
% Then wait for all their results here?
% Combine the result and send back to client
build_response(AuthResult, UserDataResult, MessageResult).
每个服务最终都会调用相应的数据驱动程序(epgsql、eredis、mongo_erlang),最终会出现一些 pooboy:transaction 和 gen_server:call。如何设计这些服务模块也尚未确定。
我想确保上面的 3 个数据调用可以并行运行,然后 handle_request 函数等待所有这 3 个调用完成,然后调用 build_response。我怎么能正确地做到这一点?
作为参考,在 NodeJS 中,我可能会这样做
var authPromise = AuthService.login(user_id, token);
var userDataPromise = UserdataService.get(user_id);
var messagePromise = MessageService.get(user_id);
Promise.all(authPromise, userDataPromise, messagePromise).then( function(values) {
buildResponse(values);
}
在 Scala 中,我可能会这样做
val authFuture = AuthService.login(userId, token)
val userDataFuture = UserdataService.get(userId)
val messageFuture = MessageService.get(userId)
for {
auth <- authFuture
userData <- userDataFuture
message <- messageFuture
} yield ( buildResponse(auth, userData, message )
显然,我认为这个问题是一个承诺/未来/收益问题。但是有人告诉我,如果我在 Erlang 中寻找 Promise,我可能会走错方向。Erlang 实现这一目标的最佳实践是什么?
解决方案
如何在 Erlang 中进行并行调用并等待所有结果?
您可以使用堆叠的接收子句。Erlang 将在接收子句中永远等待,直到消息从进程到达(或者您可以使用 指定超时after
)——这类似于在 nodejs中等待承诺:
-module(my).
-compile(export_all).
all_results() ->
Pid1 = spawn(?MODULE, getdata1, [self(), {10, 20}]),
Pid2 = spawn(?MODULE, getdata2, [self(), 30]),
Pid3 = spawn(?MODULE, getdata3, [self()]),
[receive {Pid1, Result1} -> Result1 end,
receive {Pid2, Result2} -> Result2 end,
receive {Pid3, Result3} -> Result3 end].
getdata1(From, {X, Y}) ->
%% mimic the time it takes to retrieve the data:
SleepTime = rand:uniform(100),
io:format("Sleeping for ~w milliseconds~n", [SleepTime]),
timer:sleep(SleepTime),
From ! {self(), X+Y}. %% send the data back to the main process
getdata2(From, Z) ->
SleepTime = rand:uniform(100),
io:format("Sleeping for ~w milliseconds~n", [SleepTime]),
timer:sleep(SleepTime),
From ! {self(), Z+1}.
getdata3(From) ->
SleepTime = rand:uniform(100),
io:format("Sleeping for ~w milliseconds~n", [SleepTime]),
timer:sleep(SleepTime),
From ! {self(), 16}.
请注意,此代码:
[receive {Pid1, Result1} -> Result1 end,
receive {Pid2, Result2} -> Result2 end,
receive {Pid3, Result3} -> Result3 end].
相当于:
R1 = receive {Pid1, Result1} ->
Result1
end,
R2 = receive {Pid2, Result2} ->
Result2
end,
R3 = receive {Pid3, Result3} ->
Result3
end,
[R1, R2, R3].
在外壳中:
~/erlang_programs$ erl
Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:10] [hipe] [kernel-poll:false]
Eshell V9.3 (abort with ^G)
1> c(my).
my.erl:2: Warning: export_all flag enabled - all functions will be exported
{ok,my}
2> timer:tc(my, all_results, []).
Sleeping for 66 milliseconds
Sleeping for 16 milliseconds
Sleeping for 93 milliseconds
{96356,[30,31,16]}
3> timer:tc(my, all_results, []).
Sleeping for 57 milliseconds
Sleeping for 30 milliseconds
Sleeping for 99 milliseconds
{100153,[30,31,16]}
4> timer:tc(my, all_results, []).
Sleeping for 66 milliseconds
Sleeping for 31 milliseconds
Sleeping for 24 milliseconds
{66426,[30,31,16]}
timer:tc()以微秒(1,000 微秒 = 1 毫秒)为单位返回函数执行所需的时间以及函数的返回值。例如,第一次all_results()
调用需要 96.4 毫秒才能完成,而如果按顺序执行,单个进程将需要 66+16+93=175+ 毫秒才能完成。
推荐阅读
- css - 我已经为我的倒计时时钟编辑了 CSS,但它没有显示
- ruby-on-rails - Spina CMS 路由设置默认登陆页面
- java - Java.lang.ClassnotfoundException hadoop
- pandas - Colab 内核因未知原因重启
- pointers - 如何更改在golang中作为结构引用传递的空接口的值?
- gtk - 如何像 Gedit 中的打开菜单一样创建菜单?
- python - 使用 Python 打开 Excel 文件时如何输入密码?
- c# - 如何为字符串赋值并在for循环中计算
- python - Python pptx - 创建具有多个类别的图表?
- vue.js - Vue - 数组推送中的数组推送