首页 > 解决方案 > 使用谷歌云作曲家问题读取 CSV 并加载到 gcp 存储桶中

问题描述

当我尝试从 gcp 存储桶读取 csv 并写入同一个存储桶时,我开始遇到奇怪的问题。请注意,下面的代码以前对我有用,但现在气流日志中抛出异常说

{models.py:1796} ERROR - Error executing an HTTP request: libcurl code 23 meaning 'Failed writing received data to disk/application', error details: Received 134221820 response bytes for a 134217728-byte buffe
     when reading gs://file_bucket/abc.csv
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models.py", line 1664, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 103, in execut
    return_value = self.execute_callable(
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 108, in execute_callabl
    return self.python_callable(*self.op_args, **self.op_kwargs
  File "/home/airflow/gcs/dags/handle_split_rows.py", line 56, in handle_split_row
    lines= file_stream.read(
  File "/opt/python3.6/lib/python3.6/site-packages/tensorflow/python/lib/io/file_io.py", line 132, in rea
    pywrap_tensorflow.ReadFromStream(self._read_buf, length, status)
  File "/opt/python3.6/lib/python3.6/site-packages/tensorflow/python/framework/errors_impl.py", line 528, in __exit_
    c_api.TF_GetCode(self.status.status)
tensorflow.python.framework.errors_impl.FailedPreconditionError: Error executing an HTTP request: libcurl code 23 meaning 'Failed writing received data to disk/application', error details: Received 134221820 response bytes for a 134217728-byte buffe
     when reading gs://file_bucket/abc.csv

代码:

#!/usr/bin/env python
import os
import json
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators import python_operator
from airflow.contrib.hooks import gcs_hook
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator
from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators import PythonOperator, BranchPythonOperator
from airflow.operators import BashOperator
from lib import notification_utility

default_args = {
    'owner': os.environ["OWNER"],
    'depends_on_past': False,
    'start_date': '2019-10-10 09:31:00'
}

with DAG('parse_bad_rows',
    default_args=default_args,
    catchup=False,
    schedule_interval= None
) as dag:
    def parse_rows(**context):
        import pandas as pd
        import numpy as np
        import csv
        import os
        import gcsfs
        from tensorflow.python.lib.io import file_io
        from pandas.compat import StringIO
        import io
        #**tf.disable_v2_behavior() also tried disabling v1 just in case but i dont think it makes any sense**
        #updated_file_list = context['ti'].xcom_pull(task_ids='list_files_delta_bucket_test')
        fs = gcsfs.GCSFileSystem(project='project_name')
        updated_file_list = fs.ls('/bucket_name/foldername/')
        updated_file_list = [ x for x in updated_file_list if "abc" in x ]
        print("updated_file_list------------------>",updated_file_list)
        for f in updated_file_list:
            print("File Being processed------->",f)
            file_name = os.path.splitext(f)[0]
           #**this is where the job is failing while reading the file so I am assuming it has to do something with tensorflow.python.lib.io import file_io**
            file_stream = file_io.FileIO("gs://"+f, mode='r')
            lines= file_stream.read()
            file_stream_less_cols   =io.StringIO(lines)
            Split_Rows = [x for x in file_stream_less_cols if x.count('|') < 297]
            Split_Rows = ' '.join(map(str, Split_Rows))
            file_stream.close()
            Split_Rows_Stream = pd.DataFrame(io.StringIO(Split_Rows),columns=['BLOB_COLUMN'],dtype='str')
            #Split_Rows_Stream['File_Name'] = Split_Rows_Stream.index
            parse_names = file_name.split('/')
            filename = parse_names[2]
            bucketname  = parse_names[0]
            Split_Rows_Stream['FILE_NAME'] = filename
            print("bucketname------------>",bucketname)
            print("filename------------->",filename)
            Split_Rows_Stream.to_csv("gs://"+bucketname+"/ERROR_FILES/"+filename+".csv",encoding='utf-8',quoting=csv.QUOTE_NONE,escapechar='|')

    Python_Task_Split_Rows = PythonOperator(
                             task_id= 'split_rows_to_process_test',
                             provide_context=True,
                             python_callable=parse_rows,
                             #op_kwargs={'project':'project_name','bucket':'bucket_name','table_name':'abc','delim_num':297},
                             #trigger_rule=TriggerRule.ALL_SUCCESS,
                             dag=dag
                            )
    # Orchestration
    Python_Task_Split_Rows

我也在本地尝试了相同的操作,以确保 csv 不是问题。

import pandas as pd
import numpy as np
import csv
import io
import os
#Read the file
directory='c:\\Users\BG/Downloads/file_Cleansing'
for filename in os.listdir(directory):
    file_name = filename.split('.')[0]
    f=open('c:\\Users\BG/Downloads/file_Cleansing/'+filename,'r',encoding="utf8")
    #Readlines forom the text file
    lines= f.read()
    #cleanse the lines
    file_stream   =io.StringIO(lines)
    Split_Rows    = [x for x in file_stream if x.count('|') < 297]
    Split_Rows = ' '.join(map(str, Split_Rows))
    f.close()
    Split_Rows_Stream = pd.DataFrame(io.StringIO(Split_Rows),columns=['blob'])
    Split_Rows_Stream["File_Name"] = file_name
    Split_Rows_Stream.to_csv("c:\\Users\BG/Downloads/file_Cleansed/'+filename+"_error.csv",escapechar='|',encoding='utf-8')

以上按预期工作。我的目标是找到与行预期分隔符数量不匹配的记录(基本上我的分隔符是管道,每行预期有 297 个管道,因为此 csv 中有 298 列,但在某些行中,我在数据之间有管道. )

并捕获这些记录并将其加载到 csv 中,然后将其加载到 bigquery 中的表中以连接回行(使用 sql 前导或滞后,因为我使用文件名和索引号进行排序和分组)以修复和恢复尽可能多的记录可能的。

最后,我的服务帐户已更改,这可能是 GCP 上的一些权限问题。

任何建议表示赞赏。

谢谢你的时间。

标签: python-3.xtensorflowgoogle-cloud-platformairflowgoogle-cloud-composer

解决方案


这似乎是与 [permissions][1] 相关的问题,请验证您的服务帐户是否列在存储桶权限中,以及它是否具有读取和/或写入的角色

我已经用你的代码复制了你的场景来读取文件并且它工作正常

from tensorflow.python.lib.io import file_io
import gcsfs
import os, io

fs = gcsfs.GCSFileSystem(project='project_name')
updated_file_list = fs.ls('bucket')
updated_file_list = [ x for x in updated_file_list if "filename" in x ]
print("updated_file_list------------------>",updated_file_list)

for f in updated_file_list:
    print("File Being processed------->",f)
    file_name = os.path.splitext(f)[0]
    #**this is where the job is failing while reading the file so I am assuming it has to do something with tensorflow.python.lib.io import file_io**
    file_stream = file_io.FileIO("gs://"+f, mode='r')
    lines= file_stream.read()
    print(lines)

输出:

('updated_file_list------------------>', [u'bucket/file'])
('File Being processed------->', u'bucket/file')
this is a text from a file

推荐阅读