python - 如何删除错误“TypeError: can't concat str to bytes”
问题描述
我尝试使用GitHub 代码在 PC 上与 iOS 设备和 Python 建立 USB 连接。
上面的代码是在 Python2 中创建的,所以我跑来2to3 -w usbmux.py
将其转换为 Python3 代码。
Python3代码如下。
import socket, struct, select, sys
try:
import plistlib
haveplist = True
except:
haveplist = False
class MuxError(Exception):
pass
class MuxVersionError(MuxError):
pass
class SafeStreamSocket:
def __init__(self, address, family):
self.sock = socket.socket(family, socket.SOCK_STREAM)
self.sock.connect(address)
def send(self, msg):
totalsent = 0
while totalsent < len(msg):
sent = self.sock.send(msg[totalsent:])
if sent == 0:
raise MuxError("socket connection broken")
totalsent = totalsent + sent
def recv(self, size):
msg = ''
while len(msg) < size:
chunk = self.sock.recv(size-len(msg))
if chunk == '':
raise MuxError("socket connection broken")
msg = msg + chunk
return msg
class MuxDevice(object):
def __init__(self, devid, usbprod, serial, location):
self.devid = devid
self.usbprod = usbprod
self.serial = serial
self.location = location
def __str__(self):
return "<MuxDevice: ID %d ProdID 0x%04x Serial '%s' Location 0x%x>"%(self.devid, self.usbprod, self.serial, self.location)
class BinaryProtocol(object):
TYPE_RESULT = 1
TYPE_CONNECT = 2
TYPE_LISTEN = 3
TYPE_DEVICE_ADD = 4
TYPE_DEVICE_REMOVE = 5
VERSION = 0
def __init__(self, socket):
self.socket = socket
self.connected = False
def _pack(self, req, payload):
if req == self.TYPE_CONNECT:
return struct.pack("IH", payload['DeviceID'], payload['PortNumber']) + "\x00\x00"
elif req == self.TYPE_LISTEN:
return ""
else:
raise ValueError("Invalid outgoing request type %d"%req)
def _unpack(self, resp, payload):
if resp == self.TYPE_RESULT:
return {'Number':struct.unpack("I", payload)[0]}
elif resp == self.TYPE_DEVICE_ADD:
devid, usbpid, serial, pad, location = struct.unpack("IH256sHI", payload)
serial = serial.split("\0")[0]
return {'DeviceID': devid, 'Properties': {'LocationID': location, 'SerialNumber': serial, 'ProductID': usbpid}}
elif resp == self.TYPE_DEVICE_REMOVE:
devid = struct.unpack("I", payload)[0]
return {'DeviceID': devid}
else:
raise MuxError("Invalid incoming request type %d"%req)
def sendpacket(self, req, tag, payload={}):
payload = self._pack(req, payload)
if self.connected:
raise MuxError("Mux is connected, cannot issue control packets")
length = 16 + len(payload)
data = struct.pack("IIII", length, self.VERSION, req, tag) + payload
self.socket.send(data)
def getpacket(self):
if self.connected:
raise MuxError("Mux is connected, cannot issue control packets")
dlen = self.socket.recv(4)
dlen = struct.unpack("I", dlen)[0]
body = self.socket.recv(dlen - 4)
version, resp, tag = struct.unpack("III",body[:0xc])
if version != self.VERSION:
raise MuxVersionError("Version mismatch: expected %d, got %d"%(self.VERSION,version))
payload = self._unpack(resp, body[0xc:])
return (resp, tag, payload)
class PlistProtocol(BinaryProtocol):
TYPE_RESULT = "Result"
TYPE_CONNECT = "Connect"
TYPE_LISTEN = "Listen"
TYPE_DEVICE_ADD = "Attached"
TYPE_DEVICE_REMOVE = "Detached" #???
TYPE_PLIST = 8
VERSION = 1
def __init__(self, socket):
if not haveplist:
raise Exception("You need the plistlib module")
BinaryProtocol.__init__(self, socket)
def _pack(self, req, payload):
return payload
def _unpack(self, resp, payload):
return payload
def sendpacket(self, req, tag, payload={}):
payload['ClientVersionString'] = 'usbmux.py by marcan'
if isinstance(req, int):
req = [self.TYPE_CONNECT, self.TYPE_LISTEN][req-2]
payload['MessageType'] = req
payload['ProgName'] = 'tcprelay'
BinaryProtocol.sendpacket(self, self.TYPE_PLIST, tag, plistlib.writePlistToString(payload))
def getpacket(self):
resp, tag, payload = BinaryProtocol.getpacket(self)
if resp != self.TYPE_PLIST:
raise MuxError("Received non-plist type %d"%resp)
payload = plistlib.readPlistFromString(payload)
return payload['MessageType'], tag, payload
class MuxConnection(object):
def __init__(self, socketpath, protoclass):
self.socketpath = socketpath
if sys.platform in ['win32', 'cygwin']:
family = socket.AF_INET
address = ('127.0.0.1', 27015)
else:
family = socket.AF_UNIX
address = self.socketpath
self.socket = SafeStreamSocket(address, family)
self.proto = protoclass(self.socket)
self.pkttag = 1
self.devices = []
def _getreply(self):
while True:
resp, tag, data = self.proto.getpacket()
if resp == self.proto.TYPE_RESULT:
return tag, data
else:
raise MuxError("Invalid packet type received: %d"%resp)
def _processpacket(self):
resp, tag, data = self.proto.getpacket()
if resp == self.proto.TYPE_DEVICE_ADD:
self.devices.append(MuxDevice(data['DeviceID'], data['Properties']['ProductID'], data['Properties']['SerialNumber'], data['Properties']['LocationID']))
elif resp == self.proto.TYPE_DEVICE_REMOVE:
for dev in self.devices:
if dev.devid == data['DeviceID']:
self.devices.remove(dev)
elif resp == self.proto.TYPE_RESULT:
raise MuxError("Unexpected result: %d"%resp)
else:
raise MuxError("Invalid packet type received: %d"%resp)
def _exchange(self, req, payload={}):
mytag = self.pkttag
self.pkttag += 1
self.proto.sendpacket(req, mytag, payload)
recvtag, data = self._getreply()
if recvtag != mytag:
raise MuxError("Reply tag mismatch: expected %d, got %d"%(mytag, recvtag))
return data['Number']
def listen(self):
ret = self._exchange(self.proto.TYPE_LISTEN)
if ret != 0:
raise MuxError("Listen failed: error %d"%ret)
def process(self, timeout=None):
if self.proto.connected:
raise MuxError("Socket is connected, cannot process listener events")
rlo, wlo, xlo = select.select([self.socket.sock], [], [self.socket.sock], timeout)
if xlo:
self.socket.sock.close()
raise MuxError("Exception in listener socket")
if rlo:
self._processpacket()
def connect(self, device, port):
ret = self._exchange(self.proto.TYPE_CONNECT, {'DeviceID':device.devid, 'PortNumber':((port<<8) & 0xFF00) | (port>>8)})
if ret != 0:
raise MuxError("Connect failed: error %d"%ret)
self.proto.connected = True
return self.socket.sock
def close(self):
self.socket.sock.close()
class USBMux(object):
def __init__(self, socketpath=None):
if socketpath is None:
if sys.platform == 'darwin':
socketpath = "/var/run/usbmuxd"
else:
socketpath = "/var/run/usbmuxd"
self.socketpath = socketpath
self.listener = MuxConnection(socketpath, BinaryProtocol)
try:
self.listener.listen()
self.version = 0
self.protoclass = BinaryProtocol
except MuxVersionError:
self.listener = MuxConnection(socketpath, PlistProtocol)
self.listener.listen()
self.protoclass = PlistProtocol
self.version = 1
self.devices = self.listener.devices
def process(self, timeout=None):
self.listener.process(timeout)
def connect(self, device, port):
connector = MuxConnection(self.socketpath, self.protoclass)
return connector.connect(device, port)
if __name__ == "__main__":
mux = USBMux()
print("Waiting for devices...")
if not mux.devices:
mux.process(0.1)
while True:
print("Devices:")
for dev in mux.devices:
print(dev)
mux.process()
我在 Python 3.7 中运行了上面的代码并得到了错误。错误的详细信息如下。
File "C:\Users\taichi\Documents\usbmux.py", line 81, in sendpacket
data = struct.pack("IIII", length, self.VERSION, req, tag) + payload
TypeError: can't concat str to bytes
如何重写此代码以消除错误?
解决方案
struct.pack在 python 3 中返回字节,而不是在 python 2 中返回的字符串。查看这个答案以了解模拟 python 2 行为的一种方法。
编辑:实际上你可能需要做一些稍微不同的事情,因为它看起来像socket.send想要字节作为输入。因此,您需要将组合的有效负载转换为字节。
您可以尝试更改以下似乎填充有效负载的代码以在两种情况下都返回字节。请注意返回值中添加的“b”以指定字节而不是 str。您遇到了 TYPE_LISTEN 案例,但也应该测试另一种案例,因为它看起来有相同的错误试图连接 str 和字节:
def _pack(self, req, payload):
if req == self.TYPE_CONNECT:
return struct.pack("IH", payload['DeviceID'], payload['PortNumber']) + b"\x00\x00"
elif req == self.TYPE_LISTEN:
return b""
else:
raise ValueError("Invalid outgoing request type %d"%req)
您可能还需要在 PlistProtocol 中编辑 _pack。
推荐阅读
- ios - 在 Swift 视图控制器之间传递数据不适用于第一次 segue 转换
- sql - 向 LIMIT 关键字添加多行时,MS Sql Query 变慢
- typescript - 如何防止 Firebase 一遍又一遍地创建同一个用户?
- c# - 在 Unity 和 C# 中显示当前的悬停对象属性/名称
- javascript - 如何选择创建的元素?
- javascript - 删除 jsonObject 替换为 undefined
- mysql - 使用外部表中的变量进行计算的 Mysql 顺序
- java - 是否可以在指南行 AmCharts 中使用会话值?
- spring - Spring 使用 xml 与 Azure blob 存储集成
- c# - 领域驱动设计 - 如何实现始终有效的状态