首页 > 解决方案 > 使用 MAVEN 在 Databricks Spark Scala AWS 中解析 XML - DailyMed 的 HL7 V3 文件

问题描述

从 DailyMed 提取人类处方标签文件- 下载所有药物标签。这些文件的 .xml 格式是 HL7 V3 格式,已证明难以解析尽管在我的集群上安装了正确的库,但请参阅 Databricks 中 AWS 集群中的 MAVEN XML 解析的安装说明。有人将这些文件类型从 .xml 格式正确解析为 spark 数据帧的任何提示或示例?

我目前的方法包括检索所有文件并将它们存储在 dbfs 中。

%scala
import java.net.URL
import java.io.File
import org.apache.commons.io.FileUtils

FileUtils.copyURLToFile(new URL("https://dailymed-data.nlm.nih.gov/public-release-files/dm_spl_release_human_rx_part1.zip"), new File("/dbfs/FileStore/your_path_here/dm_spl_release_human_rx_part1.zip"))

解压下载的文件

%sh
unzip -vu '/dbfs/FileStore/your_path_here/dm_spl_release_human_rx_part1.zip'  -d /dbfs/FileStore/your_path_here/

解压缩解压缩文件中的 zip 文件(开始)

%sh
for file in /dbfs/FileStore/your_path_here/prescription/*.zip
do 
unzip -j $file '*.xml' -d /dbfs/FileStore/your_path_here/xml/
done

然后由于 .xml HL7 V3 格式的独特格式,从这里开始解析变得困难。玩弄转换为 .json 但遇到了特殊字符问题。现在求助于删除特殊字符并继续将 .xml 解析为 spark 数据帧。有关某人如何在 Spark Scala 中执行此操作的任何提示都会很棒!

这是对尝试读取和结果消息的更新。

import com.databricks.spark.xml.schema_of_xml
import spark.implicits._

val df = spark.read.format("xml").load("/FileStore/your_path_here/xml/ABD6ECF0-DC8E-41DE-89F2-1E36ED9D6535.xml")
// val payloadSchema = schema_of_xml(df.select("payload").as[String])
// val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

df.show()

在此处输入图像描述

标签: javaxmlscalamavenapache-spark

解决方案


因此,我的团队中有一名开发人员 ( https://github.com/gardnmi ) 帮助解析 .xml 文档,最终将它们传递给数据框。他做得很好!把它放在这里是希望其他人能够使用它/为它做出贡献。

%python 
import pandas as pd
import numpy as np
from xml.dom import minidom
import pathlib
import os
import fnmatch
import lxml
import xml.etree.ElementTree as ET
from bs4 import BeautifulSoup
from collections import defaultdict

directory = '/dbfs/FileStore/your_path/your@domain.com/label/xml/'
files = pathlib.Path(f'{directory}').glob('*.xml')
rows = []
unscanned_files = []

for n, file in enumerate(files):
  if file.name == 'ABD6ECF0-DC8E-41DE-89F2-1E36ED9D6535_without_character_or_first_two_lines.xml':
    pass
  print(f'{n}: {file.name}')

  doc = minidom.parse(str(file))
  soup = BeautifulSoup(doc.toxml(), 'lxml')
  set_id = soup.find('setid')['root']
  text = defaultdict(list)
  
  indication_code = soup.find('code', attrs={'code': '34067-9'}) # Indication and Usage Heading
  unclassified_code = soup.find('code', attrs={'code': '42229-5'}) # Unclassified Heading
  
  # File may not contain Indication and Usage Heading
  if indication_code:
    for sibling in indication_code.nextSiblingGenerator():
      if sibling.name and sibling.text:
        if sibling.name != 'component':
          paragraphs = sibling.find_all('paragraph')
          if paragraphs:
            for paragraph in paragraphs:
              text['34067-9'].append(paragraph.text.strip('\n').replace("\n", ""))            
              # Some Text is contained within lists.  See file 002bf3fe-96c9-4969-b5f8-8818a98be6b2.xml
              for sibling in paragraph.nextSiblingGenerator():
                if sibling.name and sibling.text:
                  lists = sibling.find_all('item')
                  if lists:
                    for list_tag in lists:
                      text['34067-9'].append(list_tag.text.strip('\n').replace("\n", ""))
                         
        else:
          unclassified_code = sibling.find('code', attrs={'code': '42229-5'}) # Code 42229-5 is used for Structured Product Labeling Unclassified Section   
          if unclassified_code:
            for sibling in unclassified_code.nextSiblingGenerator():
              if sibling.name and sibling.text:
                paragraphs = sibling.find_all('paragraph')
                if paragraphs:
                  for paragraph in paragraphs:
                    text['42229-5'].append(paragraph.text.strip('\n').replace("\n", ""))                   
                    # Some Text is contained within lists.  See file 002bf3fe-96c9-4969-b5f8-8818a98be6b2.xml
                    for sibling in paragraph.nextSiblingGenerator():
                      if sibling.name and sibling.text:
                        lists = sibling.find_all('item')
                        if lists:
                          for list_tag in lists:
                            text['42229-5'].append(list_tag.text.strip('\n').replace("\n", ""))
                    
                    
  # Runs if no Indication and Usage Section Found.
  # Indications and Usage may be under the unclassified heading
  elif unclassified_code:
    for sibling in unclassified_code.nextSiblingGenerator():
      if sibling.name and sibling.text:
        paragraphs = sibling.find_all('paragraph')
        if paragraphs:
          for paragraph in paragraphs:
            text['42229-5'].append(paragraph.text.strip('\n').replace("\n", ""))        
            # Some Text is contained within lists.  See file 002bf3fe-96c9-4969-b5f8-8818a98be6b2.xml
            for sibling in paragraph.nextSiblingGenerator():
              if sibling.name and sibling.text:
                lists = sibling.find_all('item')
                if lists:
                  for list_tag in lists:
                    text['42229-5'].append(list_tag.text.strip('\n').replace("\n", ""))            

  
  # If no loic heading is found return none
  else:
    text[None].append(None)
    unscanned_files.append(file.name)
    
  for k,v in text.items():
    for n,l in enumerate(v):
      rows.append((
        file.name, # xml file
        set_id, # drug id
        k,# code https://www.fda.gov/industry/structured-product-labeling-resources/section-headings-loinc 
        n+1,# number of text found 
        l # text
      ))
      
df = pd.DataFrame(rows, columns=['file_name', 'set_id', 'loinc', 'loinc_count_per_file', 'loinc_paragraph_text'])
sdf = spark.createDataFrame(df)
spark.sql("DROP TABLE IF EXISTS sandbox.humanPrescriptionLabel_xml")
sdf.write.mode('overwrite').saveAsTable('sandbox.humanPrescriptionLabel_xml')

推荐阅读