python-3.x - 使用谷歌云作曲家问题读取 CSV 并加载到 gcp 存储桶中
问题描述
当我尝试从 gcp 存储桶读取 csv 并写入同一个存储桶时,我开始遇到奇怪的问题。请注意,下面的代码以前对我有用,但现在气流日志中抛出异常说
{models.py:1796} ERROR - Error executing an HTTP request: libcurl code 23 meaning 'Failed writing received data to disk/application', error details: Received 134221820 response bytes for a 134217728-byte buffe
when reading gs://file_bucket/abc.csv
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models.py", line 1664, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 103, in execut
return_value = self.execute_callable(
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 108, in execute_callabl
return self.python_callable(*self.op_args, **self.op_kwargs
File "/home/airflow/gcs/dags/handle_split_rows.py", line 56, in handle_split_row
lines= file_stream.read(
File "/opt/python3.6/lib/python3.6/site-packages/tensorflow/python/lib/io/file_io.py", line 132, in rea
pywrap_tensorflow.ReadFromStream(self._read_buf, length, status)
File "/opt/python3.6/lib/python3.6/site-packages/tensorflow/python/framework/errors_impl.py", line 528, in __exit_
c_api.TF_GetCode(self.status.status)
tensorflow.python.framework.errors_impl.FailedPreconditionError: Error executing an HTTP request: libcurl code 23 meaning 'Failed writing received data to disk/application', error details: Received 134221820 response bytes for a 134217728-byte buffe
when reading gs://file_bucket/abc.csv
代码:
#!/usr/bin/env python import os import json from datetime import datetime, timedelta from airflow import DAG from airflow.models import Variable from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator from airflow.utils.trigger_rule import TriggerRule from airflow.operators import python_operator from airflow.contrib.hooks import gcs_hook from airflow.contrib.operators.bigquery_operator import BigQueryOperator from airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator from airflow.operators.sensors import ExternalTaskSensor from airflow.operators import PythonOperator, BranchPythonOperator from airflow.operators import BashOperator from lib import notification_utility default_args = { 'owner': os.environ["OWNER"], 'depends_on_past': False, 'start_date': '2019-10-10 09:31:00' } with DAG('parse_bad_rows', default_args=default_args, catchup=False, schedule_interval= None ) as dag: def parse_rows(**context): import pandas as pd import numpy as np import csv import os import gcsfs from tensorflow.python.lib.io import file_io from pandas.compat import StringIO import io #**tf.disable_v2_behavior() also tried disabling v1 just in case but i dont think it makes any sense** #updated_file_list = context['ti'].xcom_pull(task_ids='list_files_delta_bucket_test') fs = gcsfs.GCSFileSystem(project='project_name') updated_file_list = fs.ls('/bucket_name/foldername/') updated_file_list = [ x for x in updated_file_list if "abc" in x ] print("updated_file_list------------------>",updated_file_list) for f in updated_file_list: print("File Being processed------->",f) file_name = os.path.splitext(f)[0] #**this is where the job is failing while reading the file so I am assuming it has to do something with tensorflow.python.lib.io import file_io** file_stream = file_io.FileIO("gs://"+f, mode='r') lines= file_stream.read() file_stream_less_cols =io.StringIO(lines) Split_Rows = [x for x in file_stream_less_cols if x.count('|') < 297] Split_Rows = ' '.join(map(str, Split_Rows)) file_stream.close() Split_Rows_Stream = pd.DataFrame(io.StringIO(Split_Rows),columns=['BLOB_COLUMN'],dtype='str') #Split_Rows_Stream['File_Name'] = Split_Rows_Stream.index parse_names = file_name.split('/') filename = parse_names[2] bucketname = parse_names[0] Split_Rows_Stream['FILE_NAME'] = filename print("bucketname------------>",bucketname) print("filename------------->",filename) Split_Rows_Stream.to_csv("gs://"+bucketname+"/ERROR_FILES/"+filename+".csv",encoding='utf-8',quoting=csv.QUOTE_NONE,escapechar='|') Python_Task_Split_Rows = PythonOperator( task_id= 'split_rows_to_process_test', provide_context=True, python_callable=parse_rows, #op_kwargs={'project':'project_name','bucket':'bucket_name','table_name':'abc','delim_num':297}, #trigger_rule=TriggerRule.ALL_SUCCESS, dag=dag ) # Orchestration Python_Task_Split_Rows
我也在本地尝试了相同的操作,以确保 csv 不是问题。
import pandas as pd
import numpy as np
import csv
import io
import os
#Read the file
directory='c:\\Users\BG/Downloads/file_Cleansing'
for filename in os.listdir(directory):
file_name = filename.split('.')[0]
f=open('c:\\Users\BG/Downloads/file_Cleansing/'+filename,'r',encoding="utf8")
#Readlines forom the text file
lines= f.read()
#cleanse the lines
file_stream =io.StringIO(lines)
Split_Rows = [x for x in file_stream if x.count('|') < 297]
Split_Rows = ' '.join(map(str, Split_Rows))
f.close()
Split_Rows_Stream = pd.DataFrame(io.StringIO(Split_Rows),columns=['blob'])
Split_Rows_Stream["File_Name"] = file_name
Split_Rows_Stream.to_csv("c:\\Users\BG/Downloads/file_Cleansed/'+filename+"_error.csv",escapechar='|',encoding='utf-8')
以上按预期工作。我的目标是找到与行预期分隔符数量不匹配的记录(基本上我的分隔符是管道,每行预期有 297 个管道,因为此 csv 中有 298 列,但在某些行中,我在数据之间有管道. )
并捕获这些记录并将其加载到 csv 中,然后将其加载到 bigquery 中的表中以连接回行(使用 sql 前导或滞后,因为我使用文件名和索引号进行排序和分组)以修复和恢复尽可能多的记录可能的。
最后,我的服务帐户已更改,这可能是 GCP 上的一些权限问题。
任何建议表示赞赏。
谢谢你的时间。
解决方案
这似乎是与 [permissions][1] 相关的问题,请验证您的服务帐户是否列在存储桶权限中,以及它是否具有读取和/或写入的角色
我已经用你的代码复制了你的场景来读取文件并且它工作正常
from tensorflow.python.lib.io import file_io
import gcsfs
import os, io
fs = gcsfs.GCSFileSystem(project='project_name')
updated_file_list = fs.ls('bucket')
updated_file_list = [ x for x in updated_file_list if "filename" in x ]
print("updated_file_list------------------>",updated_file_list)
for f in updated_file_list:
print("File Being processed------->",f)
file_name = os.path.splitext(f)[0]
#**this is where the job is failing while reading the file so I am assuming it has to do something with tensorflow.python.lib.io import file_io**
file_stream = file_io.FileIO("gs://"+f, mode='r')
lines= file_stream.read()
print(lines)
输出:
('updated_file_list------------------>', [u'bucket/file'])
('File Being processed------->', u'bucket/file')
this is a text from a file
推荐阅读
- c# - Powershell 脚本无法执行。请参阅输出中的 Service Fabric 工具窗格
- python - 从其他数据框填充空值
- javascript - Passport + Express + Typescript req.user.email 未定义
- python - Cloud NDB:事务性 put() 多个实体
- arrays - 在vb中复制字符串数组中重复n个字节的最佳方法
- random - 没有 unsigned int 的 RNG 实现
- bash - 在 .profile 中创建函数以使用 zsh 转到特定文件夹
- r - 基于级别的子集数据框
- math - 比较多项式和指数时间复杂度
- javascript - javascript - 提取单元格值