1. 程式人生 > >使用python 實現icmp測試主機存活性

使用python 實現icmp測試主機存活性

python icmp

代碼:

#!/usr/bin/env python#coding:utf-8import os, sys, socket, struct, select, time# From /usr/include/linux/icmp.h; your milage may vary.ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.def checksum(source_string): """ I‘m not too confident that this is right but testing seems to suggest that it gives the same answers as in_cksum in ping.c
""" sum = 0 countTo = (len(source_string)/2)*2 count = 0 while count<countTo: thisVal = ord(source_string[count + 1])*256 + ord(source_string[count]) sum = sum + thisVal sum = sum & 0xffffffff # Necessary? count = count + 2 if countTo<len(source_string): sum = sum + ord(source_string[
len(source_string) - 1]) sum = sum & 0xffffffff # Necessary? sum = (sum >> 16) + (sum & 0xffff) sum = sum + (sum >> 16) answer = ~sum answer = answer & 0xffff # Swap bytes. Bugger me if I know why. answer = answer >> 8 | (answer << 8 & 0xff00) return answer
def receive_one_ping(my_socket, ID, timeout): """ receive the ping from the socket. """ timeLeft = timeout while True: startedSelect = time.time() whatReady = select.select([my_socket], [], [], timeLeft) howLongInSelect = (time.time() - startedSelect) if whatReady[0] == []: # Timeout return timeReceived = time.time() recPacket, addr = my_socket.recvfrom(1024) icmpHeader = recPacket[20:28] type, code, checksum, packetID, sequence = struct.unpack( "bbHHh", icmpHeader ) if packetID == ID: bytesInDouble = struct.calcsize("d") timeSent = struct.unpack("d", recPacket[28:28 + bytesInDouble])[0] return timeReceived - timeSent timeLeft = timeLeft - howLongInSelect if timeLeft <= 0: returndef send_one_ping(my_socket, dest_addr, ID): """ Send one ping to the given >dest_addr<. """ dest_addr = socket.gethostbyname(dest_addr) # Header is type (8), code (8), checksum (16), id (16), sequence (16) my_checksum = 0 # Make a dummy heder with a 0 checksum. header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1) #壓包 #a1 = struct.unpack("bbHHh",header) #my test bytesInDouble = struct.calcsize("d") data = (192 - bytesInDouble) * "Q" data = struct.pack("d", time.time()) + data # Calculate the checksum on the data and the dummy header. my_checksum = checksum(header + data) # Now that we have the right checksum, we put that in. It‘s just easier # to make up a new header than to stuff it into the dummy. header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1) packet = header + data my_socket.sendto(packet, (dest_addr, 1)) # Don‘t know about the 1def do_one(dest_addr, timeout): """ Returns either the delay (in seconds) or none on timeout. """ icmp = socket.getprotobyname("icmp") try: my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) except socket.error, (errno, msg): if errno == 1: # Operation not permitted msg = msg + ( " - Note that ICMP messages can only be sent from processes" " running as root." ) raise socket.error(msg) raise # raise the original error my_ID = os.getpid() & 0xFFFF send_one_ping(my_socket, dest_addr, my_ID) delay = receive_one_ping(my_socket, my_ID, timeout) my_socket.close() return delaydef verbose_ping(dest_addr, timeout = 2, count = 100): """ Send >count< ping to >dest_addr< with the given >timeout< and display the result. """ for i in xrange(count): print "ping %s..." % dest_addr, try: delay = do_one(dest_addr, timeout) except socket.gaierror, e: print "failed. (socket error: ‘%s‘)" % e[1] break if delay == None: print "failed. (timeout within %ssec.)" % timeout else: delay = delay * 1000 print "get ping in %0.4fms" % delayif __name__ == ‘__main__‘:

verbose_ping("www.163.com",2,1)


用到的模塊解析:

struct:

最近在學習python網絡編程這一塊,在寫簡單的socket通信代碼時,遇到了struct這個模塊的使用,當時不太清楚這到底有和作用,後來查閱了相關資料大概了解了,在這裏做一下簡單的總結。

了解c語言的人,一定會知道struct結構體在c語言中的作用,它定義了一種結構,裏面包含不同類型的數據(int,char,bool等等),方便對某一結構對象進行處理。而在網絡通信當中,大多傳遞的數據是以二進制流(binary data)存在的。當傳遞字符串時,不必擔心太多的問題,而當傳遞諸如int、char之類的基本數據的時候,就需要有一種機制將某些特定的結構體類型打包成二進制流的字符串然後再網絡傳輸,而接收端也應該可以通過某種機制進行解包還原出原始的結構體數據。python中的struct模塊就提供了這樣的機制,該模塊的主要作用就是對python基本類型值與用python字符串格式表示的C struct類型間的轉化(This module performs conversions between Python values and C structs represented as Python strings.)。stuct模塊提供了很簡單的幾個函數,下面寫幾個例子。

struct提供用format specifier方式對數據進行打包和解包(Packing and Unpacking)。例如:

123456789101112import structimport binasciivalues = (1, ‘abc‘, 2.7)s = struct.Struct(‘I3sf‘)packed_data = s.pack(*values)unpacked_data = s.unpack(packed_data) print ‘Original values:‘, valuesprint ‘Format string :‘, s.formatprint ‘Uses :‘, s.size, ‘bytes‘print ‘Packed Value :‘, binascii.hexlify(packed_data)print ‘Unpacked Type :‘, type(unpacked_data), ‘ Value:‘, unpacked_data

輸出:

Original values: (1, ‘abc‘, 2.7)
Format string : I3sf
Uses : 12 bytes
Packed Value : 0100000061626300cdcc2c40
Unpacked Type : <type ‘tuple‘> Value: (1, ‘abc‘, 2.700000047683716)

代碼中,首先定義了一個元組數據,包含int、string、float三種數據類型,然後定義了struct對象,並制定了format‘I3sf’,I 表示int,3s表示三個字符長度的字符串,f 表示 float。最後通過struct的pack和unpack進行打包和解包。通過輸出結果可以發現,value被pack之後,轉化為了一段二進制字節串,而unpack可以把該字節串再轉換回一個元組,但是值得註意的是對於float的精度發生了改變,這是由一些比如操作系統等客觀因素所決定的。打包之後的數據所占用的字節數與C語言中的struct十分相似。

select 模塊:

Python中的select模塊專註於I/O多路復用,提供了select poll epoll三個方法(其中後兩個在Linux中可用,windows僅支持select),另外也提供了kqueue方法(freeBSD系統)

select方法:

進程指定內核監聽哪些文件描述符(最多監聽1024個fd)的哪些事件,當沒有文件描述符事件發生時,進程被阻塞;當一個或者多個文件描述符事件發生時,進程被喚醒。

當我們調用select()時:

  1 上下文切換轉換為內核態

  2 將fd從用戶空間復制到內核空間

  3 內核遍歷所有fd,查看其對應事件是否發生

  4 如果沒發生,將進程阻塞,當設備驅動產生中斷或者timeout時間後,將進程喚醒,再次進行遍歷

  5 返回遍歷後的fd

  6 將fd從內核空間復制到用戶空間

fd:file descriptor 文件描述符

技術分享

fd_r_list, fd_w_list, fd_e_list = .秒,之後返回三個空列表,如果監聽的句柄有變化,則直接執行。




本文出自 “技術改變命運” 博客,請務必保留此出處http://linux1989.blog.51cto.com/9347464/1925082

使用python 實現icmp測試主機存活性