mysql - 如何在 R 中使用 Sequel Pro 从 MySQL 中并行获取数据
问题描述
我想在 R 中使用 seqlpro 从 mysql 获取数据,但是当我运行查询时需要很长时间。这是我的代码:
old_value<- data.frame()
new_value<- data.frame()
counter<- 0
for (i in 1:length(short_list$id)) {
mydb = OpenConn(dbname = '**', user = '**', password = '**', host = '**')
query <- paste0("select * from table where id IN (",short_list$id[i],") and country IN ('",short_list$country[i],"') and date >= '2019-04-31' and `date` <= '2020-09-1';", sep = "" )
temp_old <- RMySQL::dbFetch(RMySQL::dbSendQuery(mydb, query), n = -1
query <- paste0("select * from table2 where id IN (",short_list$id[i],") and country IN ('",short_list$country[i],"') and date >= '2019-04-31' and `date` <= '2020-09-1';", sep = "" )
temp_new <- RMySQL::dbFetch(RMySQL::dbSendQuery(mydb, query), n = -1)
RMySQL::dbDisconnect(mydb)
new_value<- rbind(temp_new,new_value)
old_value<- rbind(temp_old,old_value)
counter=counter+1
base::print(paste("completed for ",counter),sep="")
}
有什么方法可以让我写得更高效并更快地调用查询,因为我有大约 5000 行应该进入循环。实际上,此查询有效,但需要时间。
我已经尝试过了,但它仍然给我错误:
#parralel computing
clust <- makeCluster(length(6))
clusterEvalQ(cl = clust, expr = lapply(c('data.table',"RMySQL","dplyr","plyr"), library, character.only = TRUE))
clusterExport(cl = clust, c('config','short_list'), envir = environment())
new_de <- parLapply(clust, short_list, function(id,country) {
for (i in 1:length(short_list$id)) {
mydb = OpenConn(dbname = '*', user = '*', password = '*', host = '**')
query <- paste0("select * from table1 where id IN (",short_list$id[i],") and country IN ('",short_list$country[i],"') and source_event_date >= date >= '2019-04-31' and `date` <= '2020-09-1';", sep = "" )
temp_data <- RMySQL::dbFetch(RMySQL::dbSendQuery(mydb, query), n = -1) %>% data.table::data.table()
RMySQL::dbDisconnect(mydb)
return(temp_data)}
})
stopCluster(clust)
gc(reset = T)
new_de <- data.table::rbindlist(new_de, use.names = TRUE)
我还定义了 short_list 列表如下:
short_list<- as.list(short_list)
里面 short_list 是:
id. country
2 US
3 UK
... ...
但是它给了我这个错误:
checkForRemoteErrors(val) 中的错误:一个节点产生错误:找不到对象“i”
但是,当我从 id[i] 和 country[i] 中删除 i 时,它只会给我第一行结果,而不是获得所有 id 和国家/地区结果。
解决方案
我认为另一种方法是将您需要的 id 上传到临时表中,然后一次查询所有内容。
tmptable <- "mytemptable"
dbWriteTable(conn, tmptable, short_list, create = TRUE)
alldat <- dbGetQuery(conn, paste("
select t1.*
from ", tmptable, " tmp
left join table1 t1 on tmp.id=t1.id and tmp.country=t1.country
where t1.`date` >= '2019-04-31' and t1.`date` <= '2020-09-1'"))
dbExecute(conn, paste("drop table", tmptable))
(许多 DBMS 使用前导#
来指示仅对本地用户可见的临时表,在模式命名空间中不太可能发生冲突,并且在连接关闭时自动清理。我通常鼓励在此处使用临时表,请在此处与您的数据库文档、架构和/或 DBA 联系以获取更多信息。)
表的顺序很重要:通过拉取所有from mytemptable
然后left join table1
放到它上面,我们有效地过滤掉了table1
不包含匹配的任何数据id
and country
。
这并不能解决数据下载的速度问题,但对此有一些想法:
- 每次您遍历查询时,您的开销都不是微不足道的;如果有很多数据,那么这个开销应该不会很大,但它仍然存在。使用单个查询将显着减少这种开销。
- 查询时间也可能受到表上任何索引(ices)的影响。超出了本次讨论的范围,但如果您有一个很大的表,则可能是相关的。如果表没有被有效地索引(或者查询的结构没有很好地使用这些索引),那么每个查询将花费有限的时间来“编译”并返回数据。同样,通过单个更高效的查询将减少开销。
- 大型查询可能会受益于使用命令行工具
mysql
;它几乎与您将获得的一样快,并且可能会解决RMySQL
和/或中的任何问题DBI
。(我并不是说它们效率低下,但是......免费的开源驱动程序不太可能比 MySQL 自己的命令行实用程序更快。
至于并行执行此操作...
你使用
parLapply
不正确。它接受单个向量/列表并迭代该列表中的每个对象。您可以使用它迭代帧的索引,但不能使用它迭代该帧中的多个列。这与 base R 完全一样lapply
。让我们看看当你打电话时发生了什么。我将其替换为
lapply
(因为在多个进程中进行调试很困难)。# parLapply(clust, mtcars, function(id, country) { ... }) lapply(mtcars, function(id, country) { browser(); 1; }) # Called from: FUN(X[[i]], ...) debug at #1: [1] 1 id # [1] 21.0 21.0 22.8 21.4 18.7 18.1 14.3 24.4 22.8 19.2 17.8 16.4 17.3 15.2 10.4 10.4 14.7 32.4 30.4 33.9 21.5 15.5 15.2 # [24] 13.3 19.2 27.3 26.0 30.4 15.8 19.7 15.0 21.4 country # Error: argument "country" is missing, with no default
因为参数(
mtcars
在这里,short_list
在你的)是 adata.frame
,因为它是一个list
-like 对象,lapply
(andparLapply
)一次对每一列进行操作。您希望它将“解压缩”数据,将第一列的id
值应用于country
. 实际上,这是一个执行此操作的函数:(Map
和并行的clusterMap
,正如我在评论中所建议的那样)。稍后再谈。并行化事物的目的是不使用
for
并行函数内部的循环。如果short_list
有 10 行,并且如果您的使用parLapply
是正确的,那么您将查询所有行 10 次,使您的问题明显更糟。在伪代码中,您将执行以下操作:parallelize for each row in short_list: # this portion is run simultaneously in 10 difference processes/threads for each row in short_list: query for data related to this row
两种选择:
提供一个参数来
parLapply
表示框架的行。new_de <- new_de <- parLapply(clust, seqlen(NROW(short_list)), function(rownum) { mydb = OpenConn(dbname = '*', user = '*', password = '*', host = '**') on.exit({ DBI::dbDisconnect(mydb) }) tryCatch( DBI::dbGetQuery(mydb, " select * from table1 where id=? and country=? and source_event_date >= date >= '2019-04-31' and `date` <= '2020-09-1'", params = list(short_list$id[rownum], short_list$country[rownum])), error = function(e) e) })
使用
clusterMap
相同的效果。new_de <- clusterMap(clust, function(id, country) { mydb = OpenConn(dbname = '*', user = '*', password = '*', host = '**') on.exit({ DBI::dbDisconnect(mydb) }) tryCatch( DBI::dbGetQuery(mydb, " select * from table1 where id=? and country=? and source_event_date >= date >= '2019-04-31' and `date` <= '2020-09-1'", params = list(id, country)), error = function(e) e) }, short_list$id, short_list$country)
如果您不熟悉
Map
,这就像将多个向量/列表“压缩”在一起。例如:myfun1 <- function(i) paste(i, "alone") lapply(1:3, myfun1) ### "unrolls" to look like list( myfun1(1), myfun1(2), myfun1(3) ) myfun3 <- function(i,j,k) paste(i, j, k, sep = '-') Map(f = myfun3, 1:3, 11:13, 21:23) ### "unrolls" to look like list( myfun3(1, 11, 21), myfun3(2, 12, 22), myfun3(3, 13, 23) )
我在改编后的代码中获得了一些自由:
- 我从
dbSendQuery
/dbFetch
双击转换为对dbGetQuery
. - 我正在使用
DBI
函数,因为DBI
函数提供了每个驱动程序包提供的超集。(无论如何,您可能会使用其中的一些,也许没有意识到。)您可以毫无问题地切换回来。 - 我补充说
tryCatch
,因为有时错误在并行过程中很难处理。这意味着您需要检查每个进程的返回值,以查看是inherits(ret, "error")
(问题)还是is.data.frame
(正常)。 - 我
on.exit
这样使用,即使有问题,连接关闭仍然应该发生。
推荐阅读
- r - 一起使用 paste() 和 rep() 的问题
- python - 创建包含在字典中的 pandas 数据帧的 JSON 响应
- node.js - 如何在 npm package.json 脚本中使用日期格式变量?
- sql-server - 使用按钮将数据从 Excel 导入 SQL 表
- aggregate - 将两级数据放在一个表格工作表中并聚合它们
- python - 以 0.05 为增量四舍五入
- java - 有没有办法将它与正则表达式而不是循环匹配?
- mysql - 从MySql中的表中获取两个日期之间的所有月份
- python - 匹配具有交替字符的字符串
- python - 在散景标签中包含 folium