首页 > 解决方案 > python函数中的unicode编码

问题描述

我真的在这个问题上苦苦挣扎。

在下面,我采用了类似 something.facebook.com 的域,并使用我的 UDF 将其转换为 facebook.com。

我得到了这个错误:

UnicodeEncodeError: 'ascii' codec can't encode characters in position 64-65: ordinal not in range(128)

我已经尝试了一些方法来解决它,但我真的不明白为什么它会导致问题。

我真的很感激任何指示:)

toplevel = ['.co.uk', '.co.nz', '.com', '.net', '.uk', '.org', '.ie', '.it', '.gov.uk', '.news', '.co.in',
  '.io', '.tw', '.es', '.pe', '.ca', '.de', '.to', '.us', '.br', '.im', '.ws', '.gr', '.cc', '.cn', '.me', '.be',
  '.tv', '.ru', '.cz', '.st', '.eu', '.fi', '.jp', '.ai', '.at', '.ch', '.ly', '.fr', '.nl', '.se', '.cat', '.com.au',
  '.com.ar', '.com.mt', '.com.co', '.org.uk', '.com.mx', '.tech', '.life', '.mobi', '.info', '.ninja', '.today', '.earth', '.click']

def cleanup(domain):
    print(domain)
    if domain is None or domain == '':
        domain = 'empty'
        return domain
    for tld in toplevel:
            if tld in str(domain):
                    splitdomain = domain.split('.')
                    ext = tld.count('.')
                    if ext == 1:
                        cdomain = domain.split('.')[-2].encode('utf-8') + '.' + domain.split('.')[-1].encode('utf-8')
                        return cdomain
                    elif ext == 2:
                        cdomain = domain.split('.')[-3].encode('utf-8') + '.' + domain.split('.')[-2].encode('utf-8') + '.' + domain.split('.')[-1].encode('utf-8')
                        return cdomain
                    elif domain == '':
                        cdomain = 'empty'
                        return cdomain
                    else:
                        return domain
'''
#IPFR DOMS
'''
output = ipfr_logs.withColumn('capital',udfdict(ipfr_logs.domain)).createOrReplaceTempView('ipfrdoms')
ipfr_table_output = spark_session.sql('insert overwrite table design.ipfr_tld partition(dt=' + yday_date + ') select dt, hour, vservername, loc, cast(capital as string), count(distinct(emsisdn)) as users, sum(bytesdl) as size from ipfrdoms group by dt, hour, vservername, loc, capital')

这是完整的跟踪

Traceback (most recent call last):
  File "/data/keenek1/py_files/2020_scripts/web_ipfr_complete_new.py", line 177, in <module>
    ipfr_table_output = spark_session.sql('insert overwrite table design.ipfr_tld partition(dt=' + yday_date + ') select dt, hour, vservername, loc, cast(capital as string), count(distinct(emsisdn)) as users, sum(bytesdl) as size from ipfrdoms group by dt, hour, vservername, loc, capital')
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/session.py", line 714, in sql
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o51.sql.
: org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
        at org.apache.spark.sql.hive.execution.SaveAsHiveFile$class.saveAsHiveFile(SaveAsHiveFile.scala:87)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:66)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:195)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:115)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 685 in stage 0.0 failed 4 times, most recent failure: Lost task 685.3 in stage 0.0 (TID 4945, uds-far-dn112.dab.02.net, executor 22): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 229, in main
    process()
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 149, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "<string>", line 1, in <lambda>
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 74, in <lambda>
    return lambda *a: f(*a)
  File "/data/keenek1/py_files/2020_scripts/web_ipfr_complete_new.py", line 147, in cleanup
UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-3: ordinal not in range(128)

标签: pythonpython-3.xapache-spark

解决方案


这似乎可以解决它。有趣的是,输出永远不是“unicode error”

   def cleanup(domain):
        try:
            if domain is None or domain == '':
                domain = 'empty'
                return str(domain)
            for tld in toplevel:
                    if tld in domain: 
                            splitdomain = domain.split('.')
                            ext = tld.count('.')
                            if ext == 1:
                                cdomain = domain.split('.')[-2] + '.' + domain.split('.')[-1]
                                return str(cdomain)
                            elif ext == 2:
                                cdomain = domain.split('.')[-3] + '.' + domain.split('.')[-2] + '.' + domain.split('.')[-1]
                                return str(cdomain)
                            elif domain == '':
                                cdomain = 'empty'
                                return str(cdomain)
                            else:
                                return str(domain)
        except UnicodeEncodeError:
            domain = 'unicode error'
            return domain

推荐阅读