python-3.x - 使用 Mikrotik API 脚本登录后如何修复卡住?
问题描述
我现在正在写关于 Mikrotik 脚本的论文。我在 Mikrotik 网站上尝试了 API,但在 Mikrotik 路由器上的登录方法发送 !done 消息后,我无法对 Mikrotik 进行任何查询。登录后脚本似乎卡住了。我已经尝试了一些关于以前问题的建议,甚至将我的脚本与另一个 API 脚本结合起来,但它仍然卡住了。如何解决这个问题?谢谢。
这是我的主要代码:(基本上这段代码与 Mikrotik API 相同)
def main():
user = "admin"
passw = ""
#use default username and password if not specified
if len(sys.argv) == 4:
user = sys.argv[2]
passw = sys.argv[3]
elif len(sys.argv) == 3:
user = sys.argv[2]
s = None
for res in socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except (socket.error):
s = None
continue
try:
s.connect(sa)
except (socket.error):
s.close()
s = None
continue
break
if s is None:
print ('could not open socket')
sys.exit(1)
mikrotik = Mikrotik(s)
mikrotik.login(user, passw)
inputsentence = []
while 1:
r = select.select([s, sys.stdin], [], [], None)
#r = select.select(s, [], [], 0.0)
if s in r[0]:
x = mikrotik.readSentence() # something to read in socket, read sentence
if sys.stdin in r[0]:
l = sys.stdin.readline() # read line from input and strip off newline
l = l[:-1]
# if empty line, send sentence and start with new
# otherwise append to input sentence
if l == '':
mikrotik.writeSentence(inputsentence)
inputsentence = []
else:
inputsentence.append(l)
if __name__ == '__main__':
main()
我希望脚本允许我将查询传递到我的 Mikrotik 路由器,但不幸的是,脚本在给出 !done 消息并返回此错误消息后停止。
in main function
r = select.select([s, sys.stdin], [], [], None)
OSError: [WinError 10038] An operation was attempted on something that is not a socket
解决方案
这是我对同一个 python 模块的稍微修改的版本。它在 python 3+ 和 RouterOS 6.3+ 中运行良好
#!/usr/bin/python3
# -*- coding: latin-1 -*-
import sys, time, binascii, socket, select
import hashlib
class ApiRos:
"Routeros api"
def __init__(self, sk, silent:bool=False):
self.sk = sk
self.currenttag = 0
self.silent = silent
def login(self, username, pwd):
for repl, attrs in self.talk(["/login", "=name=" + username,
"=password=" + pwd]):
if repl == '!trap':
return False
elif '=ret' in attrs.keys():
#for repl, attrs in self.talk(["/login"]):
chal = binascii.unhexlify((attrs['=ret']).encode(sys.stdout.encoding))
md = hashlib.md5()
md.update(b'\x00')
md.update(pwd.encode(sys.stdout.encoding))
md.update(chal)
for repl2, attrs2 in self.talk(["/login", "=name=" + username,
"=response=00" + binascii.hexlify(md.digest()).decode(sys.stdout.encoding) ]):
if repl2 == '!trap':
return False
return True
def talk(self, words):
if self.writeSentence(words) == 0: return
r = []
while 1:
i = self.readSentence();
if len(i) == 0: continue
reply = i[0]
attrs = {}
for w in i[1:]:
j = w.find('=', 1)
if (j == -1):
attrs[w] = ''
else:
attrs[w[:j]] = w[j+1:]
r.append((reply, attrs))
if reply == '!done': return r
def writeSentence(self, words):
ret = 0
for w in words:
self.writeWord(w)
ret += 1
self.writeWord('')
return ret
def readSentence(self):
r = []
while 1:
w = self.readWord()
if w == '': return r
r.append(w)
def writeWord(self, w):
if not self.silent: print(("<<< " + w))
self.writeLen(len(w))
self.writeStr(w)
def readWord(self):
ret = self.readStr(self.readLen())
if not self.silent: print((">>> " + ret))
return ret
def writeLen(self, l):
if l < 0x80:
self.writeByte((l).to_bytes(1, sys.byteorder))
elif l < 0x4000:
l |= 0x8000
tmp = (l >> 8) & 0xFF
self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder))
elif l < 0x200000:
l |= 0xC00000
self.writeByte(((l >> 16) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder))
elif l < 0x10000000:
l |= 0xE0000000
self.writeByte(((l >> 24) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((l >> 16) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder))
else:
self.writeByte((0xF0).to_bytes(1, sys.byteorder))
self.writeByte(((l >> 24) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((l >> 16) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder))
def readLen(self):
c = ord(self.readStr(1))
# print (">rl> %i" % c)
if (c & 0x80) == 0x00:
pass
elif (c & 0xC0) == 0x80:
c &= ~0xC0
c <<= 8
c += ord(self.readStr(1))
elif (c & 0xE0) == 0xC0:
c &= ~0xE0
c <<= 8
c += ord(self.readStr(1))
c <<= 8
c += ord(self.readStr(1))
elif (c & 0xF0) == 0xE0:
c &= ~0xF0
c <<= 8
c += ord(self.readStr(1))
c <<= 8
c += ord(self.readStr(1))
c <<= 8
c += ord(self.readStr(1))
elif (c & 0xF8) == 0xF0:
c = ord(self.readStr(1))
c <<= 8
c += ord(self.readStr(1))
c <<= 8
c += ord(self.readStr(1))
c <<= 8
c += ord(self.readStr(1))
return c
def writeStr(self, str):
n = 0;
while n < len(str):
r = self.sk.send(bytes(str[n:], 'UTF-8'))
if r == 0: raise RuntimeError("connection closed by remote end")
n += r
def writeByte(self, str):
n = 0;
while n < len(str):
r = self.sk.send(str[n:])
if r == 0: raise RuntimeError("connection closed by remote end")
n += r
def readStr(self, length):
ret = ''
# print ("length: %i" % length)
while len(ret) < length:
s = self.sk.recv(length - len(ret))
if s == b'': raise RuntimeError("connection closed by remote end")
# print (b">>>" + s)
# atgriezt kaa byte ja nav ascii chars
if s >= (128).to_bytes(1, "big") :
return s
# print((">>> " + s.decode(sys.stdout.encoding, 'ignore')))
ret += s.decode(sys.stdout.encoding, "replace")
return ret
def main():
s = None
for res in socket.getaddrinfo(sys.argv[1], "8728", socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error:
s = None
continue
try:
s.connect(sa)
except socket.error:
s.close()
s = None
continue
break
if s is None:
print ('could not open socket')
sys.exit(1)
apiros = ApiRos(s);
# use default username and pasword if not specified
if len(sys.argv) == 4:
if not apiros.login(sys.argv[2], sys.argv[3]):
return
elif len(sys.argv) == 3:
if not apiros.login(sys.argv[2], ""):
return
else :
if not apiros.login("admin", ""):
return
inputsentence = []
while 1:
r = select.select([s, sys.stdin], [], [], None)
if s in r[0]:
# something to read in socket, read sentence
x = apiros.readSentence()
if sys.stdin in r[0]:
# read line from input and strip off newline
l = sys.stdin.readline()
l = l[:-1]
# if empty line, send sentence and start with new
# otherwise append to input sentence
if l == '':
apiros.writeSentence(inputsentence)
inputsentence = []
else:
inputsentence.append(l)
if __name__ == '__main__':
main()
推荐阅读
- c# - 如何只使用一种方法来填充 ComBobox 和 ListBox 的项目?
- python-3.x - 当给定一个太长的数组来定位方法时,为什么访问 MultiIndexed DataFrame 不会失败?
- lua - 防止在没有 file:close() 的情况下多次写入文件
- javascript - 如何访问 Object 的 getter 和 setter 函数?
- python - 在 Tensorflow 2.0 中从路径部分创建标签,而路径不是标签名称
- makefile - 为什么在我的 Makefile 中包含一个文件会更改我的 Makefile-directory 变量?
- python - numba 中的多维 numpy.expand_dims
- angular - 用玩笑测试 NGRX/效果 - 测试总是通过
- c++ - xtensor:选择具有特定列值的行
- php - phpunit mock 不计算函数调用