Trụ Sở Chính
Tầng 6, Tòa nhà AC, 78 Duy Tân, Cầu Giấy, Hà Nội
[+84]24-7303-9996
[email protected]/ [email protected]
VTI - Văn phòng HL
Tầng 9, Tòa nhà HL, 82 Duy Tân, Cầu Giấy, Hà Nội
[+84]24-7303-9996
[email protected]/ [email protected]
VTI - Văn phòng 3A
Tầng 8, Tòa nhà 3A, 82 Duy Tân, Cầu Giấy, Hà Nội
[+84]24-7303-9996
[email protected]/ [email protected]
VTI Tokyo
Tầng 4, Tòa nhà T&T, 8-21 Tomihisacho Shinjuku-ku, Tokyo, 162-0067 Japan
[+81]3-6261-5698
[email protected]
VTI Osaka
Tầng 2, Tòa nhà Hiranomachi Century, 2-5-8 Hiranomachi, Chuo-ku, Osaka
[+81]6-7878-5841
[email protected]
Tiếp series lập trình Python, xin giới thiệu với các bạn đoạn code Python 3 sau sử dụng thư viện paramiko có nhiệm vụ kết nối SSH tới 1 Remote Linux Server và chạy command “cat /etc/*-release” trên đó.
Kết nối SSH và chạy command sử dụng Python 3
import paramiko ssh = paramiko.SSHClient[] # Auto add host to known hosts ssh.set_missing_host_key_policy[paramiko.AutoAddPolicy[]] # Connect to server ssh.connect["192.168.2.100", username="root", password="123456"] # Do command [ssh_stdin, ssh_stdout, ssh_stderr] = ssh.exec_command["cat /etc/*-release"] # Get status code of command exit_status = ssh_stdout.channel.recv_exit_status[] # Print status code print ["exit status: %s" % exit_status] # Print content for line in ssh_stdout.readlines[]: print[line.rstrip[]] # Close ssh connect ssh.close[]
Kết quả chạy thử:
Cài đặt paramiko module cho Python 3
Để chạy đoạn code trên cần cài đặt Python Module/Package là paramiko, chúng ta cài thông qua pip3
pip3 install paramiko
Khắc phục một số lỗi khi cài đặt paramiko
Fix lỗi:
CryptographyDeprecationWarning: encode_point has been deprecated on EllipticCurvePublicNumbers and will be removed in a future version. Please use EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed point encoding.
pip3 install cryptography==2.4.2
Nguồn: vinasupport.com
Cách tôi đã giải quyết vấn đề này là với trình quản lý ngữ cảnh. Điều này sẽ đảm bảo rằng các lệnh đang chạy dài của tôi bị hủy bỏ. Logic chính là quấn để bắt chước SSHClient.exec_command nhưng nắm bắt kênh đã tạo và sử dụng một Timer
sẽ đóng kênh đó nếu lệnh chạy quá
lâu.
import paramiko
import threading
class TimeoutChannel:
def __init__[self, client: paramiko.SSHClient, timeout]:
self.expired = False
self._channel: paramiko.channel = None
self.client = client
self.timeout = timeout
def __enter__[self]:
self.timer = threading.Timer[self.timeout, self.kill_client]
self.timer.start[]
return self
def __exit__[self, exc_type, exc_val, exc_tb]:
print["Exited Timeout. Timed out:", self.expired]
self.timer.cancel[]
if exc_val:
return False # Make sure the exceptions are re-raised
if self.expired:
raise TimeoutError["Command timed out"]
def kill_client[self]:
self.expired = True
print["Should kill client"]
if self._channel:
print["We have a channel"]
self._channel.close[]
def exec[self, command, bufsize=-1, timeout=None, get_pty=False, environment=None]:
self._channel = self.client.get_transport[].open_session[timeout=timeout]
if get_pty:
self._channel.get_pty[]
self._channel.settimeout[timeout]
if environment:
self._channel.update_environment[environment]
self._channel.exec_command[command]
stdin = self._channel.makefile_stdin["wb", bufsize]
stdout = self._channel.makefile["r", bufsize]
stderr = self._channel.makefile_stderr["r", bufsize]
return stdin, stdout, stderr
Để sử dụng mã, nó khá đơn giản bây giờ, ví dụ đầu tiên sẽ đưa ra một TimeoutError
ssh = paramiko.SSHClient[]
ssh.connect['hostname', username='user', password='pass']
with TimeoutChannel[ssh, 3] as c:
ssh_stdin, ssh_stdout, ssh_stderr = c.exec["cat"] # non-blocking
exit_status = ssh_stdout.channel.recv_exit_status[] # block til done, will never complete because cat wants input
Mã này sẽ hoạt động tốt [trừ khi máy chủ đang tải quá tải!]
ssh = paramiko.SSHClient[]
ssh.connect['hostname', username='user', password='pass']
with TimeoutChannel[ssh, 3] as c:
ssh_stdin, ssh_stdout, ssh_stderr = c.exec["uptime"] # non-blocking
exit_status = ssh_stdout.channel.recv_exit_status[] # block til done, will complete quickly
print[ssh_stdout.read[].decode["utf8"]] # Show results
0 hữu ích 0 bình luận chia sẻ