python - 有问题的字符串参数 ctypes python 与 C 代码
问题描述
我正在为 C 代码编写一个包装器。我为代码提供了不同的选项,除了第三个参数外,一切都很好。第三个参数是一个指向文件的指针。我通过编码python的字符串(path_to_log_file)来编组数据。但是,当它不起作用时,这第三个参数应该将 shell 的输出流重定向到日志文件。我得到的结果是一个像这样的奇怪文件, 我想我遗漏了一些东西,但我不明白出了什么问题,因为我对字符串进行了编码。C函数签名:
int lisaf_phasemeter_main(int argc, char* argv[], const char *LogFileName)
驱动代码:
#%% Module import
import signal
import ctypes
import os
import sys
import json
import multiprocessing
#%% Constant definition.
# Driver accepted options
ACCEPTED_DRIVER_ARGS=["logfile"]
# C_Code accepted options
ACCEPTED_ARGS = ["erase","binary","help","intclk"]
ACCEPTED_KWARGS= ["fsampling","comment","trun","vcal"]
# Opening config file object and creating dictionnary object to parse it
PATH_TO_CONFIG_FILE="config/Config.json"
ConfigFile = open(PATH_TO_CONFIG_FILE, "r")
jsonContent = ConfigFile.read()
Config = json.loads(jsonContent)
# Driver's name
PROGRAM_NAME=str(__file__)
# Configuring signal to enable interuption of C_Code
signal.signal(signal.SIGINT,signal.SIG_DFL)
#%% Class definiton
class Phasemeter :
""" This class is a driver for LISA Phasemeters data acquisiton """
# List of all phasemeters objects
phase_list=[]
def __init__(self,name="PH1") :
""" This method define path, enables interuptions of C code and defines
C code signature
TODO : Paralelization
"""
self.name = name
# Path to driver configuration File
self.path_to_cfg_file=Config[self.name]["PATH_TO_CFG_FILE"]
# Path to data File
self.path_to_data_file=Config[self.name]["PATH_TO_DATA_FILE"]
# Path to log file
self.path_to_log_file=Config[self.name]["PATH_TO_LOG_FILE"]
# Path to Shared Object File. Shared Object (SO) File is the output
# of C code used by this program to use the C Code.
self.path_to_so_file=Config[self.name]["PATH_TO_SO_FILE"]
self.so_namefile=Config[self.name]["SO_NAMEFILE"]
# Defining path to shared object file
self.path=os.path.abspath(os.path.join(os.getcwd(),
self.path_to_so_file))
self.libname = self.so_namefile
self.LIBC = ctypes.CDLL(os.path.join(self.path,self.libname))
# Settings C library's signature datatype
self.LIBC.lisaf_phasemeter_main.argtypes= [ctypes.c_int,
ctypes.POINTER(ctypes.c_char_p),
ctypes.POINTER(ctypes.c_char_p),]
self.LIBC.lisaf_phasemeter_main.restypes = [ctypes.c_int,]
Phasemeter.phase_list.append(self)
print(Phasemeter.phase_list)
def start(self,*args,**kwargs):
""" This method starts acquisition for the Phasemeter. It handles
options and launches the Phasemeter's C code for acquisiiton.
-----------------------------------------------------------------------
Simple arguments :
(str) erase: Erase previous Data file without asking permission
(default is off)
-> my_phasemeter.start("erase")
(str) binary: Change output format to binary (default is txt)
-> my_phasemeter.start("binary")
(str) intclk: Call internal clock (unstable)
-> my_phasemeter.start("intclk")
(str) help: Call c code's help (not very usefull to use the wrapper)
-> my_phasemeter.start("help")
Keywords arguments :
(int) fsampling: Sampling frequency (default is 0.0)
(float) -> my_phasemeter.start(fsampling=#value)
(int) vcal: Calibration Voltage (default is 0.0)
(float) -> my_phasemeter.start(vcal=#value)
(int) trun: Specify acquisition time,any negative results in infinite
(float) acquisition time (default is 0.0)
-> my_phasemeter.start(trun=#your_value)
(str) comment: Add a comment to the data file's header
-> my_phasemeter.start(comment="#your_comment")
-----------------------------------------------------------------------
Method call examples
: my_phasemeter.start()
: my_phasemeter.start("erase",fsampling=100,trun=50,
comment="Test")
: my_phasemeter.start("help")
: my_phasemeter.start("intclk","erase",comment="Test",
fsampling=100,trun=100,vcal=2)
-----------------------------------------------------------------------
"""
self.log_flag= False
# Robustness
# Checkings simple arguments
for argument in args :
# Checking if args are string type
if type(argument) is not str :
sys.tracebacklimit = 0
raise TypeError("Wrong type for argument ",argument,
", only strings are authorized")
# Checking keywords argument
for argument in kwargs :
if argument in ["fsampling","vcal"] and int(kwargs[argument]) < 0 :
raise ValueError('Please enter a positive number for keyword ',
argument)
if argument in ["comment","logfile"] and type(kwargs[argument]) is not str :
raise TypeError
("Please enter a string for keyword argument",argument)
# Changing data structure to fit Marshalling
# Here we want to mimic C code's argv
# Storing valid arguments
args_list=[]
# Storing non transmited arguments
ignored_args=[]
# Adding core arguments
args_list.append(PROGRAM_NAME)
args_list.append(self.path_to_cfg_file)
args_list.append(self.path_to_data_file)
# Handling optionnal arguments
for argument in args :
if argument in ACCEPTED_ARGS :
args_list.append(str("--"+str(argument)))
elif argument == "logfile":
self.log_flag = True
else :
ignored_args.append(argument)
for argument in kwargs :
if argument in ACCEPTED_KWARGS :
args_list.append(str("--"+str(argument)))
args_list.append(str(kwargs[argument]))
else :
ignored_args.append(argument)
# Expanding the argument list to mimic a C array
options=[*args_list]
# Debug
print("Options transmitted to C Code : ",*options)
if ignored_args:
print("Ignored given options : ",*ignored_args)
if "logfile" in ignored_args:
print("Logfile option activated")
# Marshalling data for C library
self.c_char_pointer_array = ctypes.c_char_p * len(options)
self.encoded_options = [str.encode(str(i)) for i in options ]
self.encoded_options = self.c_char_pointer_array (*self.encoded_options)
# If Logfile option activated we encode the path to the log file
if self.log_flag :
self.c_char_pointer_array = ctypes.c_char_p * 1
self.encoded_path_to_log_file = [str.encode(self.path_to_log_file)]
self.encoded_path_to_log_file = self.c_char_pointer_array (*self.encoded_path_to_log_file)
# Calling C library wihth encoded options
# If the logfile option is activated then the encoded string is transmited
if self.log_flag :
self.status = self.LIBC.lisaf_phasemeter_main(len(self.encoded_options),
self.encoded_options,self.encoded_path_to_log_file)
# Otherwise None pointer is transmited
else :
self.status = self.LIBC.lisaf_phasemeter_main(len(self.encoded_options),
self.encoded_options,None)
解决方案
As Dan said
the issue is the code passes a
char**
argument to a function that takes achar *
parameter
So I figured how to fix this mistake by changing two things :
The signature of the C function in the Python code :
self.LIBC.lisaf_phasemeter_main.argtypes=[ctypes.c_int,ctypes.POINTER(ctypes.c_char_p),ctypes.c_char_p,]
The data structure passed to the C function
if "logfile" in args : self.encoded_path_to_log_file = str.encode(self.path_to_log_file)
Full code :
#%% Module import
import signal
import ctypes
import os
import sys
import json
import multiprocessing
#%% Constant definition.
# Driver accepted options
ACCEPTED_DRIVER_ARGS=["logfile"]
# C_Code accepted options
ACCEPTED_ARGS = ["erase","binary","help","intclk"]
ACCEPTED_KWARGS= ["fsampling","comment","trun","vcal"]
# Opening config file object and creating dictionnary object to parse it
PATH_TO_CONFIG_FILE="config/Config.json"
ConfigFile = open(PATH_TO_CONFIG_FILE, "r")
jsonContent = ConfigFile.read()
Config = json.loads(jsonContent)
# Driver's name
PROGRAM_NAME=str(__file__)
# Configuring signal to enable interuption of C_Code
signal.signal(signal.SIGINT,signal.SIG_DFL)
#%% Class definiton
class Phasemeter :
""" This class is a driver for LISA Phasemeters data acquisiton """
# List of all phasemeters objects
phase_list=[]
def __init__(self,name="PH1") :
""" This method define path, enables interuptions of C code and defines
C code signature
TODO : Paralelization
"""
self.name = name
# Path to driver configuration File
self.path_to_cfg_file=Config[self.name]["PATH_TO_CFG_FILE"]
# Path to data File
self.path_to_data_file=Config[self.name]["PATH_TO_DATA_FILE"]
# Path to log file
self.path_to_log_file=Config[self.name]["PATH_TO_LOG_FILE"]
# Path to Shared Object File. Shared Object (SO) File is the output
# of C code used by this program to use the C Code.
self.path_to_so_file=Config[self.name]["PATH_TO_SO_FILE"]
self.so_namefile=Config[self.name]["SO_NAMEFILE"]
# Defining path to shared object file
self.path=os.path.abspath(os.path.join(os.getcwd(),
self.path_to_so_file))
self.libname = self.so_namefile
self.LIBC = ctypes.CDLL(os.path.join(self.path,self.libname))
# Settings C library's signature datatype
self.LIBC.lisaf_phasemeter_main.argtypes= [ctypes.c_int,
ctypes.POINTER(ctypes.c_char_p),
ctypes.c_char_p,]
self.LIBC.lisaf_phasemeter_main.restypes = [ctypes.c_int,]
Phasemeter.phase_list.append(self)
print(Phasemeter.phase_list)
def start(self,*args,**kwargs):
""" This method starts acquisition for the Phasemeter. It handles
options and launches the Phasemeter's C code for acquisiiton.
-----------------------------------------------------------------------
Simple arguments :
(str) erase: Erase previous Data file without asking permission
(default is off)
-> my_phasemeter.start("erase")
(str) binary: Change output format to binary (default is txt)
-> my_phasemeter.start("binary")
(str) intclk: Call internal clock (unstable)
-> my_phasemeter.start("intclk")
(str) help: Call c code's help (not very usefull to use the wrapper)
-> my_phasemeter.start("help")
(str) logfile : Redirect the shell output to a log file
-> my_phasemeter.start("logfile")
Keywords arguments :
(int) fsampling: Sampling frequency (default is 0.0)
(float) -> my_phasemeter.start(fsampling=#value)
(int) vcal: Calibration Voltage (default is 0.0)
(float) -> my_phasemeter.start(vcal=#value)
(int) trun: Specify acquisition time,any negative results in infinite
(float) acquisition time (default is 0.0)
-> my_phasemeter.start(trun=#your_value)
(str) comment: Add a comment to the data file's header
-> my_phasemeter.start(comment="#your_comment")
-----------------------------------------------------------------------
Method call examples
: my_phasemeter.start()
: my_phasemeter.start("erase",fsampling=100,trun=50,
comment="Test")
: my_phasemeter.start("help")
: my_phasemeter.start("intclk","erase",comment="Test",
fsampling=100,trun=100,vcal=2)
-----------------------------------------------------------------------
"""
# Robustness
# Checkings simple arguments
for argument in args :
# Checking if args are string type
if type(argument) is not str :
sys.tracebacklimit = 0
raise TypeError("Wrong type for argument ",argument,
", only strings are authorized")
# Checking keywords argument
for argument in kwargs :
if argument in ["fsampling","vcal"] and int(kwargs[argument]) < 0 :
raise ValueError('Please enter a positive number for keyword ',
argument)
if argument in ["comment","logfile"] and type(kwargs[argument]) is not str :
raise TypeError
("Please enter a string for keyword argument",argument)
# Changing data structure to fit Marshalling
# Here we want to mimic C code's argv
# Storing valid arguments
args_list=[]
# Storing non transmited arguments
ignored_args=[]
# Adding core arguments
args_list.append(PROGRAM_NAME)
args_list.append(self.path_to_cfg_file)
args_list.append(self.path_to_data_file)
# Handling optionnal arguments
for argument in args :
if argument in ACCEPTED_ARGS :
args_list.append(str("--"+str(argument)))
elif argument == "logfile" :
else :
ignored_args.append(argument)
for argument in kwargs :
if argument in ACCEPTED_KWARGS :
args_list.append(str("--"+str(argument)))
args_list.append(str(kwargs[argument]))
else :
ignored_args.append(argument)
# Expanding the argument list to mimic a C array
options=[*args_list]
# Debug
print("Options transmitted to C Code : ",*options)
if ignored_args:
print("Ignored given options : ",*ignored_args)
# Verbose
if "logfile" in args:
print("Logfile option activated, logs going to : ", self.path_to_log_file)
# Marshalling data for C library
self.c_char_pointer_array = ctypes.c_char_p * len(options)
self.encoded_options = [str.encode(str(i)) for i in options ]
self.encoded_options = self.c_char_pointer_array (*self.encoded_options)
# If Logfile option activated we encode the path to the log file
if "logfile" in args :
self.encoded_path_to_log_file = str.encode(self.path_to_log_file)
# Calling C library wihth encoded options
# If the logfile option is activated then the encoded string is transmited
if "logfile" in args :
self.status = self.LIBC.lisaf_phasemeter_main(len(self.encoded_options),
self.encoded_options,self.encoded_path_to_log_file)
# Otherwise None pointer is transmited
else :
self.status = self.LIBC.lisaf_phasemeter_main(len(self.encoded_options),
self.encoded_options,None)
推荐阅读
- php - 根据 ROW 插入数据
- android - 如何从 Android 上的 Firebase 中的特定节点检索所有子数据?
- reactjs - React 本机过滤 API 平面列表
- python - 如何解决 ajax 代码中 FLASK 的 404 错误?
- c# - 在控制器级别向遥测请求添加自定义属性
- python - 在 dask 或 Dramatiq 中带有 (bind=True) 的芹菜?
- node.js - How to configure Port Forwarding with Google Cloud Compute Engine for a Node.JS application
- html - 防止从文件夹自动加载 index.html
- reactjs - React 编译失败(未定义计数器)
- javascript - React:如何将一组 Ids 从嵌套对象添加到状态对象