首页 > 解决方案 > 在数据流中查找两个子字符串之间的字符串

问题描述

我有这个连续的串行数据流:

----------------------------------------
 
SENSOR COORDINATE         = 0
 
MEASURED RESISTANCE       = 3.70 kOhm
 
----------------------------------------
 
----------------------------------------
 
SENSOR COORDINATE         = 1
 
MEASURED RESISTANCE       = 3.70 kOhm
 
----------------------------------------
 
----------------------------------------
 
SENSOR COORDINATE         = 2
 
MEASURED RESISTANCE       = 3.69 kOhm
 
----------------------------------------

对于每次迭代,我希望能够获取值。传感器坐标值和电阻值。

我找到了使用.split()和使用正则表达式的解决方案( 在两个子字符串之间查找字符串),但问题是在我的情况下,我想要过滤的不是一个字符串,而是一个连续的流。

例如,.split()会找到我的字符串,但它会将流分成两半。这在连续流中不止一次起作用。

注意:在传感器坐标值之后,我有一个回车符。

编辑 1/3:这是获取串行数据的代码片段:

def readSerial():
    global after_id
    while ser.in_waiting:
        try:
            ser_bytes = ser.readline() #read data from the serial line
            ser_bytes = ser_bytes.decode("utf-8")
            text.insert("end", ser_bytes)
        except UnicodeDecodeError:
            print("UnicodeDecodeError")
    else:
        print("No data received")
    after_id=root.after(50,readSerial)

如果有人想知道,这是 arduino 端的 C 代码,它发送数据:

Serial.println("----------------------------------------");
Serial.print("SENSOR COORDINATE         = ");
Serial.println(sensor_coord);
Serial.print("MEASURED RESISTANCE       = ");
double resistanse = ((period * GAIN_VALUE * 1000) / (4 * CAPACITOR_VALUE)) - R_BIAS_VALUE;
Serial.print(resistanse);
Serial.println(" kOhm");

编辑 2/3:这是以前的方法:

def readSerial():
        global after_id
        while ser.in_waiting:
            try:
                ser_bytes = ser.readline() #read data from the serial line
                ser_bytes = ser_bytes.decode("utf-8")
                text.insert("end", ser_bytes)
                result = re.search.(, ser_bytes)
                print(result)
            except UnicodeDecodeError:
                print("UnicodeDecodeError")
        else:
            print("No data received")
        after_id=root.after(50,readSerial)

在另一次尝试中,我将此行更改result = re.search.(, ser_bytes)result =ser_bytes.split("TE = ").

这是我收到的数据的图片(这是一个 tkinter 文本框架)。 在此处输入图像描述

编辑 3/3:这是我实现 dracarys 算法的代码:

def readSerial():
    global after_id
    while ser.in_waiting:
        try:
            ser_bytes = ser.readline() 
            print(ser_bytes)
            ser_bytes = ser_bytes.decode("utf-8")
            print(ser_bytes)
            text.insert("end", ser_bytes)
           
            if "SENSOR COORDINATE" in ser_bytes:
               found_coordinate = True
               coordinate = int(ser_bytes.split("=")[1].strip())
               print("Coordinate",coordinate)
            if "MEASURED RESISTANCE" in ser_bytes and found_coordinate:
               found_coordinate = False
               resistance = float(ser_bytes.split("=")[1].split("kOhm")[0].strip())
               print("Resistance",resistance)
        
        except UnicodeDecodeError:
            print("UnicodeDecodeError")
    else:
        print("No data received")
    after_id=root.after(50,readSerial)

这是我得到的错误,在代码成功运行大约十秒钟后(我也包含了正常操作输出以供参考):

No data received
b'SENSOR COORDINATE         = 2\r\n'
SENSOR COORDINATE         = 2

Coordinate 2
b'MEASURED RESISTANCE       = 3.67 kOhm\r\n'
MEASURED RESISTANCE       = 3.67 kOhm

Resistance 3.67
b'----------------------------------------\r\n'
----------------------------------------

b'----------------------------------------\r\n'
----------------------------------------

b'SENSOR COORDINATE         = 3\r\n'
SENSOR COORDINATE         = 3

Coordinate 3
No data received
b'MEASURED RESISTANCE       = 3.78 kOhm\r\n'
MEASURED RESISTANCE       = 3.78 kOhm

Exception in Tkinter callback
Traceback (most recent call last):
  File "C:\Users\User1\AppData\Local\Programs\Python\Python38-32\lib\tkinter\__i
nit__.py", line 1883, in __call__
    return self.func(*args)
  File "C:\Users\User1\AppData\Local\Programs\Python\Python38-32\lib\tkinter\__i
nit__.py", line 804, in callit
    func(*args)
  File "tkinterWithPortsExperiment.py", line 73, in readSerial
    if "MEASURED RESISTANCE" in ser_bytes and found_coordinate:
UnboundLocalError: local variable 'found_coordinate' referenced before assignment

标签: pythonpython-3.xregexstringsplit

解决方案


正如我在评论中所说,我觉得应该简化 Arduino 输出。正如@oliver_t 所说,每个传感器事件的单行 JSON 将是完美的。

如果你不能这样做,这里是解析这个的代码。

由于我没有任何方式逐行接收您的串行监视器输出,因此我通过将输出存储在 txt 文件中然后逐行读取来模拟这一点。我希望这会有所帮助,因为您的问题是如何解析输入。

f = open('stream.txt', 'r')
global found_coordinate
found_coordinate = False
while True:
    line = f.readline()
    if not line:
        break
    
    if "SENSOR COORDINATE" in line:
        found_coordinate = True
        coordinate = int(line.split("=")[1].strip())
        print("Coordinate",coordinate)
    
    if "MEASURED RESISTANCE" in line and found_coordinate:
        found_coordinate = False
        resistance = float(line.split("=")[1].split("kOhm")[0].strip())
        print("Resistance",resistance)

我希望这会有所帮助,如果我对您的要求的理解有任何差异,请告诉我,以便我修复我的代码。

注意:您实际上可能不需要.strip()对 a 进行类型转换intfloat处理它,但是我仍然将它放在那里作为健全性检查


推荐阅读