首页 > 解决方案 > 优化 Scala 代码以读取不适合内存的大文件的有效方法

问题描述

下面的问题陈述,

我们有一个大型日志文件,用于存储用户与应用程序的交互。日志文件中的条目遵循以下架构:{userId, timestamp, actionType} 其中 actionType 是两个可能值之一:[open, close]

约束:

  1. 日志文件太大,无法放入一台机器的内存中。还假设聚合数据不适合内存。
  2. 代码必须能够在单台机器上运行。
  3. 不应使用开箱即用的 mapreduce 或 3rd 方数据库实现;不要假设我们有 Hadoop 或 Spark 或其他分布式计算框架。
  4. 每个用户的每个 actionType 可以有多个条目,并且日志文件中可能缺少条目。因此,用户可能会丢失两个打开记录之间的关闭记录,反之亦然。
  5. 时间戳将严格按照升序排列。

对于这个问题,我们需要实现一个/多个类来计算每个用户在打开和关闭之间花费的平均时间。请记住,某些用户缺少条目,因此我们在进行计算时必须选择如何处理这些条目。关于我们如何做出选择,代码应该遵循一致的政策。

对于日志文件中的所有用户,解决方案所需的输出应该是 [{userId, timeSpent},….]。

示例日志文件(逗号分隔的文本文件)

1,1435456566,open 
2,1435457643,open 
3,1435458912,open 
1,1435459567,close 
4,1435460345,open 
1,1435461234,open 
2,1435462567,close 
1,1435463456,open 
3,1435464398,close 
4,1435465122,close 
1,1435466775,close

方法

下面是我用 Python 和 Scala 编写的代码,它似乎效率不高,也没有达到给定场景的预期,我想从这个论坛的开发者社区反馈我们如何更好地优化这个代码给定的场景。

斯卡拉实现

import java.io.FileInputStream
import java.util.{Scanner, Map, LinkedList}
import java.lang.Long
import scala.collection.mutable

object UserMetrics extends App {
  if (args.length == 0) {
    println("Please provide input data file name for processing")
  } 
  val userMetrics = new UserMetrics()
  userMetrics.readInputFile(args(0),if (args.length == 1) 600000 else args(1).toInt)
}

case class UserInfo(userId: Integer, prevTimeStamp: Long, prevStatus: String, timeSpent: Long, occurence: Integer)

class UserMetrics {

  val usermap = mutable.Map[Integer, LinkedList[UserInfo]]()

  def readInputFile(stArr:String, timeOut: Int) {
    var inputStream: FileInputStream = null
    var sc: Scanner = null
    try {
      inputStream = new FileInputStream(stArr);
      sc = new Scanner(inputStream, "UTF-8");
      while (sc.hasNextLine()) {
        val line: String = sc.nextLine();
        processInput(line, timeOut)
      }
      
      for ((key: Integer, userLs: LinkedList[UserInfo]) <- usermap) {
        val userInfo:UserInfo = userLs.get(0)
        val timespent = if (userInfo.occurence>0) userInfo.timeSpent/userInfo.occurence else 0
        println("{" + key +","+timespent + "}")
      }

      if (sc.ioException() != null) {
        throw sc.ioException();
      }
    } finally {
      if (inputStream != null) {
        inputStream.close();
      }
      if (sc != null) {
        sc.close();
      }
    }
  }

  def processInput(line: String, timeOut: Int) {
    val strSp = line.split(",")

    val userId: Integer = Integer.parseInt(strSp(0))
    val curTimeStamp = Long.parseLong(strSp(1))
    val status = strSp(2)
    val uInfo: UserInfo = UserInfo(userId, curTimeStamp, status, 0, 0)
    val emptyUserInfo: LinkedList[UserInfo] = new LinkedList[UserInfo]()

    val lsUserInfo: LinkedList[UserInfo] = usermap.getOrElse(userId, emptyUserInfo)

    if (lsUserInfo != null && lsUserInfo.size() > 0) {
      val lastUserInfo: UserInfo = lsUserInfo.get(lsUserInfo.size() - 1)
      val prevTimeStamp: Long = lastUserInfo.prevTimeStamp
      val prevStatus: String = lastUserInfo.prevStatus
      
      if (prevStatus.equals("open")) {
        if (status.equals(lastUserInfo.prevStatus)) {
           val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
           val timeDiff = lastUserInfo.timeSpent + timeSelector
          lsUserInfo.remove()
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))
        } else if(!status.equals(lastUserInfo.prevStatus)){
          val timeDiff = lastUserInfo.timeSpent + curTimeStamp - prevTimeStamp
          lsUserInfo.remove()
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))
        }
      } else if(prevStatus.equals("close")) {
        if (status.equals(lastUserInfo.prevStatus)) {
          lsUserInfo.remove()
          val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent + timeSelector, lastUserInfo.occurence+1))
        }else if(!status.equals(lastUserInfo.prevStatus))
          {     
          lsUserInfo.remove()
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent, lastUserInfo.occurence))
        }
      }
    }else if(lsUserInfo.size()==0){
      lsUserInfo.add(uInfo)
    }
    usermap.put(userId, lsUserInfo)
  }

}

Python 实现

import sys

def fileBlockStream(fp, number_of_blocks, block):
    #A generator that splits a file into blocks and iterates over the lines of one of the blocks.
 
    assert 0 <= block and block < number_of_blocks #Assertions to validate number of blocks given
    assert 0 < number_of_blocks
 
    fp.seek(0,2) #seek to end of file to compute block size
    file_size = fp.tell() 
 
    ini = file_size * block / number_of_blocks #compute start & end point of file block
    end = file_size * (1 + block) / number_of_blocks
 
    if ini <= 0:
        fp.seek(0)
    else:
        fp.seek(ini-1)
        fp.readline()
 
    while fp.tell() < end:
        yield fp.readline() #iterate over lines of the particular chunk or block

def computeResultDS(chunk,avgTimeSpentDict,defaultTimeOut):
    countPos,totTmPos,openTmPos,closeTmPos,nextEventPos = 0,1,2,3,4
    for rows in chunk.splitlines():
        if len(rows.split(",")) != 3:
            continue
        userKeyID = rows.split(",")[0]
        try:
            curTimeStamp = int(rows.split(",")[1])
        except ValueError:
            print("Invalid Timestamp for ID:" + str(userKeyID))
            continue
        curEvent = rows.split(",")[2]
        if userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "close": 
        #Check if already existing userID with expected Close event 0 - Open; 1 - Close
        #Array value within dictionary stores [No. of pair events, total time spent (Close tm-Open tm), Last Open Tm, Last Close Tm, Next expected Event]
            curTotalTime = curTimeStamp - avgTimeSpentDict[userKeyID][openTmPos]
            totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
            eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
            avgTimeSpentDict[userKeyID][countPos] = eventCount
            avgTimeSpentDict[userKeyID][totTmPos] = totalTime
            avgTimeSpentDict[userKeyID][closeTmPos] = curTimeStamp
            avgTimeSpentDict[userKeyID][nextEventPos] = 0 #Change next expected event to Open
        
        elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "open":
            avgTimeSpentDict[userKeyID][openTmPos] = curTimeStamp
            avgTimeSpentDict[userKeyID][nextEventPos] = 1 #Change next expected event to Close
        
        elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "open":
            curTotalTime,closeTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][openTmPos],curTimeStamp)
            totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
            avgTimeSpentDict[userKeyID][totTmPos]=totalTime
            avgTimeSpentDict[userKeyID][closeTmPos]=closeTime
            avgTimeSpentDict[userKeyID][openTmPos]=curTimeStamp
            eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
            avgTimeSpentDict[userKeyID][countPos] = eventCount          

        elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "close": 
            curTotalTime,openTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][closeTmPos],curTimeStamp)
            totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
            avgTimeSpentDict[userKeyID][totTmPos]=totalTime
            avgTimeSpentDict[userKeyID][openTmPos]=openTime
            eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
            avgTimeSpentDict[userKeyID][countPos] = eventCount

        elif curEvent == "open":
            #Initialize userid with Open event
            avgTimeSpentDict[userKeyID] = [0,0,curTimeStamp,0,1]
        
        elif curEvent == "close":
            #Initialize userid with missing handler function since there is no Open event for this User
            totaltime,OpenTime = missingHandler(defaultTimeOut,0,curTimeStamp)
            avgTimeSpentDict[userKeyID] = [1,totaltime,OpenTime,curTimeStamp,0]

def missingHandler(defaultTimeOut,curTimeVal,lastTimeVal):
    if lastTimeVal - curTimeVal > defaultTimeOut:
        return defaultTimeOut,curTimeVal
    else:
        return lastTimeVal - curTimeVal,curTimeVal

def computeAvg(avgTimeSpentDict,defaultTimeOut):
    resDict = {}
    for k,v in avgTimeSpentDict.iteritems():
        if v[0] == 0:
            resDict[k] = 0
        else:
            resDict[k] = v[1]/v[0]
    return resDict

if __name__ == "__main__":
    avgTimeSpentDict = {}
    if len(sys.argv) < 2:
        print("Please provide input data file name for processing")
        sys.exit(1)
        
    fileObj = open(sys.argv[1])
    number_of_chunks = 4 if len(sys.argv) < 3 else int(sys.argv[2])
    defaultTimeOut = 60000 if len(sys.argv) < 4 else int(sys.argv[3])
    for chunk_number in range(number_of_chunks):
        for chunk in fileBlockStream(fileObj, number_of_chunks, chunk_number):
            computeResultDS(chunk, avgTimeSpentDict, defaultTimeOut)
    print (computeAvg(avgTimeSpentDict,defaultTimeOut))
    avgTimeSpentDict.clear() #Nullify dictionary 
    fileObj.close #Close the file object

上面的两个程序都提供了所需的输出,但效率对于这个特定场景很重要。如果您对现有实施有任何更好或任何建议,请告诉我。

提前致谢!!

标签: scalaoptimizationfile-io

解决方案


你所追求的是迭代器的使用。我不打算重新编写你的代码,但这里的技巧很可能是使用iterator. 幸运的是,Scala 为这项工作提供了不错的开箱即用工具。

import scala.io.Source
object ReadBigFiles {
  def read(fileName: String): Unit = {
    val lines: Iterator[String] = Source.fromFile(fileName).getLines
    // now you get iterator semantics for the file line traversal
    // that means you can only go through the lines once, but you don't incur a penalty on heap usage
  }
}

对于您的用例,您似乎需要 a lastUser,因此您正在处理 2 个条目的组。我认为您有两个选择,要么去 for iterator.sliding(2),它将为每一对生成迭代器,或者简单地使用选项将递归添加到组合中。

def navigate(source: Iterator[String], last: Option[User]): ResultType = {
  if (source.hasNext) {
    val current = source.next()
    last match {
      case Some(existing) => // compare with previous user etc
      case None => navigate(source, Some(current))
    }
  } else {
    // exit recursion, return result
  }
}

您可以避免为读取文件等而编写的所有代码。如果您需要计算出现次数,只需Map在递归内部构建一个,并根据您的业务逻辑在每一步增加出现次数。


推荐阅读