首页 > 解决方案 > 如何在 R 中使用 Sequel Pro 从 MySQL 中并行获取数据

问题描述

我想在 R 中使用 seqlpro 从 mysql 获取数据,但是当我运行查询时需要很长时间。这是我的代码:

old_value<- data.frame()
new_value<- data.frame()
counter<- 0
for (i in 1:length(short_list$id)) {
mydb = OpenConn(dbname = '**', user = '**', password = '**', host = '**')
query <- paste0("select * from table where id IN (",short_list$id[i],") and country IN ('",short_list$country[i],"')  and date >= '2019-04-31' and `date` <= '2020-09-1';", sep = "" )
temp_old <- RMySQL::dbFetch(RMySQL::dbSendQuery(mydb, query), n = -1
query <- paste0("select * from table2 where id IN (",short_list$id[i],") and country IN ('",short_list$country[i],"') and date >= '2019-04-31' and `date` <= '2020-09-1';", sep = "" )
temp_new <- RMySQL::dbFetch(RMySQL::dbSendQuery(mydb, query), n = -1)
RMySQL::dbDisconnect(mydb)
new_value<- rbind(temp_new,new_value)
old_value<- rbind(temp_old,old_value)
counter=counter+1

base::print(paste("completed for ",counter),sep="")
}

有什么方法可以让我写得更高效并更快地调用查询,因为我有大约 5000 行应该进入循环。实际上,此查询有效,但需要时间。

我已经尝试过了,但它仍然给我错误:

    #parralel computing 
    clust <- makeCluster(length(6))
    clusterEvalQ(cl = clust, expr = lapply(c('data.table',"RMySQL","dplyr","plyr"), library, character.only = TRUE))
    clusterExport(cl = clust, c('config','short_list'), envir = environment())
    new_de <- parLapply(clust, short_list, function(id,country) {
for (i in 1:length(short_list$id)) {
      mydb = OpenConn(dbname = '*', user = '*', password = '*', host = '**')
      query <- paste0("select * from table1 where id IN (",short_list$id[i],") and country IN ('",short_list$country[i],"') and source_event_date >= date >= '2019-04-31' and `date` <= '2020-09-1';", sep = "" )
      temp_data <- RMySQL::dbFetch(RMySQL::dbSendQuery(mydb, query), n = -1) %>% data.table::data.table()
      RMySQL::dbDisconnect(mydb)
      return(temp_data)}
    })
    stopCluster(clust)
    gc(reset = T)
    new_de <- data.table::rbindlist(new_de, use.names = TRUE)

我还定义了 short_list 列表如下:

short_list<- as.list(short_list)

里面 short_list 是:

id.  country 
2     US
3     UK
...   ...

但是它给了我这个错误:

checkForRemoteErrors(val) 中的错误:一个节点产生错误:找不到对象“i”

但是,当我从 id[i] 和 country[i] 中删除 i 时,它只会给我第一行结果,而不是获得所有 id 和国家/地区结果。

标签: mysqlsqlrparallel-processing

解决方案


我认为另一种方法是将您需要的 id 上传到临时表中,然后一次查询所有内容。

tmptable <- "mytemptable"
dbWriteTable(conn, tmptable, short_list, create = TRUE)
alldat <- dbGetQuery(conn, paste("
  select t1.*
  from ", tmptable, " tmp
    left join table1 t1 on tmp.id=t1.id and tmp.country=t1.country
  where t1.`date` >= '2019-04-31' and t1.`date` <= '2020-09-1'"))
dbExecute(conn, paste("drop table", tmptable))

(许多 DBMS 使用前导#来指示仅对本地用户可见的临时表,在模式命名空间中不太可能发生冲突,并且在连接关闭时自动清理。我通常鼓励在此处使用临时表,请在此处与您的数据库文档、架构和/或 DBA 联系以获取更多信息。)

表的顺序很重要:通过拉取所有from mytemptable然后left join table1放到它上面,我们有效地过滤掉了table1不包含匹配的任何数据idand country

这并不能解决数据下载的速度问题,但对此有一些想法:

  1. 每次您遍历查询时,您的开销都不是微不足道的;如果有很多数据,那么这个开销应该不会很大,但它仍然存在。使用单个查询将显着减少这种开销。
  2. 查询时间也可能受到表上任何索引(ices)的影响。超出了本次讨论的范围,但如果您有一个很大的表,则可能是相关的。如果表没有被有效地索引(或者查询的结构没有很好地使用这些索引),那么每个查询将花费有限的时间来“编译”并返回数据。同样,通过单个更高效的查询将减少开销。
  3. 大型查询可能会受益于使用命令行工具mysql;它几乎与您将获得的一样快,并且可能会解决RMySQL和/或中的任何问题DBI。(我并不是说它们效率低下,但是......免费的开源驱动程序不太可能比 MySQL 自己的命令行实用程序更快。

至于并行执行此操作...

  1. 你使用parLapply不正确。它接受单个向量/列表并迭代该列表中的每个对象。您可以使用它迭代帧的索引,但不能使用它迭代该帧中的多个列。这与 base R 完全一样lapply

    让我们看看当你打电话时发生了什么。我将其替换为lapply(因为在多个进程中进行调试很困难)。

    # parLapply(clust, mtcars, function(id, country) { ... })
    lapply(mtcars, function(id, country) { browser(); 1; })
    # Called from: FUN(X[[i]], ...)
    debug at #1: [1] 1
    id
    #  [1] 21.0 21.0 22.8 21.4 18.7 18.1 14.3 24.4 22.8 19.2 17.8 16.4 17.3 15.2 10.4 10.4 14.7 32.4 30.4 33.9 21.5 15.5 15.2
    # [24] 13.3 19.2 27.3 26.0 30.4 15.8 19.7 15.0 21.4
    country
    # Error: argument "country" is missing, with no default
    

    因为参数(mtcars在这里,short_list在你的)是 a data.frame,因为它是一个list-like 对象,lapply(and parLapply)一次对每一进行操作。您希望它将“解压缩”数据,将第一列的id值应用于country. 实际上,这是一个执行此操作的函数:(Map和并行的clusterMap,正如我在评论中所建议的那样)。稍后再谈。

  2. 并行化事物的目的是使用for并行函数内部的循环。如果 short_list有 10 行,并且如果您的使用parLapply是正确的,那么您将查询所有行 10 次,使您的问题明显更糟。在伪代码中,您将执行以下操作:

    parallelize for each row in short_list:
        # this portion is run simultaneously in 10 difference processes/threads
        for each row in short_list:
            query for data related to this row
    

两种选择:

  1. 提供一个参数来parLapply表示框架的行。

    new_de <-     new_de <- parLapply(clust, seqlen(NROW(short_list)), function(rownum) {
      mydb = OpenConn(dbname = '*', user = '*', password = '*', host = '**')
      on.exit({ DBI::dbDisconnect(mydb) })
      tryCatch(
        DBI::dbGetQuery(mydb, "
          select * from table1
          where id=? and country=?
            and source_event_date >= date >= '2019-04-31' and `date` <= '2020-09-1'",
          params = list(short_list$id[rownum], short_list$country[rownum])),
        error = function(e) e)
    })
    
  2. 使用clusterMap相同的效果。

    new_de <- clusterMap(clust, function(id, country) {
      mydb = OpenConn(dbname = '*', user = '*', password = '*', host = '**')
      on.exit({ DBI::dbDisconnect(mydb) })
      tryCatch(
        DBI::dbGetQuery(mydb, "
          select * from table1
          where id=? and country=?
            and source_event_date >= date >= '2019-04-31' and `date` <= '2020-09-1'",
          params = list(id, country)),
        error = function(e) e)
    }, short_list$id, short_list$country)
    

    如果您不熟悉Map,这就像将多个向量/列表“压缩”在一起。例如:

    myfun1 <- function(i) paste(i, "alone")
    lapply(1:3, myfun1)
    ### "unrolls" to look like
    list(
      myfun1(1),
      myfun1(2),
      myfun1(3)
    )
    
    myfun3 <- function(i,j,k) paste(i, j, k, sep = '-')
    Map(f = myfun3, 1:3, 11:13, 21:23)
    ### "unrolls" to look like
    list(
      myfun3(1, 11, 21),
      myfun3(2, 12, 22),
      myfun3(3, 13, 23)
    )
    

我在改编后的代码中获得了一些自由:

  1. 我从dbSendQuery/dbFetch双击转换为对dbGetQuery.
  2. 我正在使用DBI函数,因为DBI函数提供了每个驱动程序包提供的超集。(无论如何,您可能会使用其中的一些,也许没有意识到。)您可以毫无问题地切换回来。
  3. 我补充说tryCatch,因为有时错误在并行过程中很难处理。这意味着您需要检查每个进程的返回值,以查看是inherits(ret, "error")(问题)还是is.data.frame(正常)。
  4. on.exit这样使用,即使有问题,连接关闭仍然应该发生。

推荐阅读