首页 > 解决方案 > 将 FASTQ 文件读入 AWS Glue 作业脚本

问题描述

我需要将 FASTQ 文件读入 AWS Glue 作业脚本,但出现此错误:

回溯(最后一次调用):文件“/opt/amazon/bin/runscript.py”,第 59 行,在 runpy.run_path(script, run_name=' main ') 文件“/usr/lib64/python3.7/runpy .py”,第 261 行,在 run_path 代码中,fname = _get_code_from_file(run_name, path_name) 文件“/usr/lib64/python3.7/runpy.py”,第 236 行,在 _get_code_from_file 代码 = compile(f.read(), fname, 'exec') 文件 "/tmp/test20200930", 第 24 行 datasource0 = spark.createDataset(sc.textFile("s3://sample-genes-data/fastq/S_Sonnei_short_reads_1.fastq").sliding(4, 4 ).map { ^ SyntaxError: invalid syntax 在处理上述异常过程中,发生了另一个异常:Traceback(最近一次调用最后一次):文件“/opt/amazon/bin/runscript.py”,第 92 行,在 while “runpy. py”在 new_stack.tb_frame.f_code 中。co_filename:AttributeError:“NoneType”对象没有属性“tb_frame”

这是我的代码:

import org.apache.spark.mllib.rdd.RDDFunctions._

datasource0 = spark.createDataset(sc.textFile("s3://sample-genes-data/fastq/S_Sonnei_short_reads_1.fastq").sliding(4, 4).map {
  case Array(id, seq, _, qual) => (id, seq, qual)
 }).toDF("identifier", "sequence", "quality")
datasource1 = DynamicFrame.fromDF(datasource0, glueContext, "nullv")

我点击了这个链接: Read FASTQ file into a Spark dataframe

标签: amazon-web-servicespysparkfastqaws-glue-spark

解决方案


我能够通过将代码包装在一个GlueApp对象中来运行它。您可以通过替换您的 S3 路径来使用以下代码。

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.mllib.rdd.RDDFunctions._

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val sparkSession: SparkSession = glueContext.getSparkSession
    import sparkSession.implicits._
    val datasource0 = sparkSession.createDataset(spark.textFile("s3://<s3path>").sliding(4, 4).map {
  case Array(id, seq, _, qual) => (id, seq, qual)
 }).toDF("identifier", "sequence", "quality")
   val datasource1 = DynamicFrame(datasource0, glueContext)
   datasource1.show()
   datasource1.printSchema()
   Job.commit()
  }
}

通过输入:

@seq1
AGTCAGTCGAC
+
?@@FFBFFDDH
@seq2
CCAGCGTCTCG
+
?88ADA?BDF8

输出:

{"identifier": "@seq1", "sequence": "AGTCAGTCGAC", "quality": "?@@FFBFFDDH"}
{"identifier": "@seq2", "sequence": "CCAGCGTCTCG", "quality": "?88ADA?BDF8"}

推荐阅读