1. 程式人生 > >python 寫的簡易小ntp 伺服器

python 寫的簡易小ntp 伺服器

ntp.py

# -*- coding: UTF-8 -*-
import wx
import datetime
import socket
import struct
import time
import Queue
import mutex
import threading
import select
import win32api

#全域性變數

ntpserverversion = '2018.03.12'
taskQueue = Queue.Queue()
stopFlag = False
utc_offset = 8
stratum = 1
syntime = 0
serverip = '0.0.0.0'
serverport = '123'
poll_interval = 6
version = 3
retry_cnt = 5

#載入配置檔案函式
def loadsettingsjsonfile():
	import json
	global stratum,utc_offset,syntime,serverip,serverport,poll_interval,version
	with open('profile/settings.json','r') as fp :
		settings = json.loads(fp.read())
		stratum = settings['stratum']
		utc_offset = settings['utc_offset']
		syntime = settings['syntime']
		serverip = settings['serverip']
		serverport = settings['serverport']
		poll_interval = settings['poll_interval']
		version = settings['version']
		
	del json

def system_to_ntp_time(timestamp):
	"""Convert a system time to a NTP time.

	Parameters:
	timestamp -- timestamp in system time

	Returns:
	corresponding NTP time
	"""
	return timestamp + NTP.NTP_DELTA
def ntp_to_system_time(timestamp):
	"""Convert a system time to a NTP time.

	Parameters:
	timestamp -- timestamp in system time

	Returns:
	corresponding NTP time
	"""
	return timestamp - NTP.NTP_DELTA
	
def setSystemTime(timestamp):    
	tm_year,tm_mon,tm_mday,tm_hour,tm_min,tm_sec,tm_wday,tm_yday,tm_isdst = time.gmtime(timestamp)
	# ms一定要計算,不然2次時間校對的過程中會差距幾百ms,誤差太大!!
	tm_ms = int((timestamp - int(timestamp))*1000)
	win32api.SetSystemTime(tm_year,tm_mon,tm_wday,tm_mday,tm_hour,tm_min,tm_sec,tm_ms)
	
def _to_int(timestamp):
	"""Return the integral part of a timestamp.

	Parameters:
	timestamp -- NTP timestamp

	Retuns:
	integral part
	"""
	return int(timestamp)

def _to_frac(timestamp, n=32):
	"""Return the fractional part of a timestamp.

	Parameters:
	timestamp -- NTP timestamp
	n         -- number of bits of the fractional part

	Retuns:
	fractional part
	"""
	return int((timestamp - _to_int(timestamp)) * 2**n )

def _to_time(integ, frac, n=32):
	"""Return a timestamp from an integral and fractional part.

	Parameters:
	integ -- integral part
	frac  -- fractional part
	n     -- number of bits of the fractional part

	Retuns:
	timestamp
	"""
	return integ + float(frac)/2**n



class NTPException(Exception):
	"""Exception raised by this module."""
	pass


class NTP:
	"""Helper class defining constants."""

	_SYSTEM_EPOCH = datetime.date(*time.gmtime(0)[0:3])
	"""system epoch"""
	_NTP_EPOCH = datetime.date(1900, 1, 1)
	"""NTP epoch"""
	NTP_DELTA = (_SYSTEM_EPOCH - _NTP_EPOCH).days * 24 * 3600
	"""delta between system and NTP time"""
	ref_timestamp = 0
	recv_timestamp = 0 
	orig_timestamp = 0
	orig_timestamp_h = 0
	orig_timestamp_l = 0
	tx_timestamp = 0
	#stratum = 1的時候,ref_id應該是字串,而不能是ip地址!
	ref_id = 0
	
	REF_ID_TABLE = {
			'DNC': "DNC routing protocol",
			'NIST': "NIST public modem",
			'TSP': "TSP time protocol",
			'DTS': "Digital Time Service",
			'ATOM': "Atomic clock (calibrated)",
			'VLF': "VLF radio (OMEGA, etc)",
			'callsign': "Generic radio",
			'LORC': "LORAN-C radionavidation",
			'GOES': "GOES UHF environment satellite",
			'GPS': "GPS UHF satellite positioning",
	}
	"""reference identifier table"""

	STRATUM_TABLE = {
		0: "unspecified",
		1: "primary reference",
	}
	"""stratum table"""

	MODE_TABLE = {
		0: "unspecified",
		1: "symmetric active",
		2: "symmetric passive",
		3: "client",
		4: "server",
		5: "broadcast",
		6: "reserved for NTP control messages",
		7: "reserved for private use",
	}
	"""mode table"""

	LEAP_TABLE = {
		0: "no warning",
		1: "last minute has 61 seconds",
		2: "last minute has 59 seconds",
		3: "alarm condition (clock not synchronized)",
	}
	"""leap indicator table"""

class NTPPacket:
	"""NTP packet class.

	This represents an NTP packet.
	"""
	
	_PACKET_FORMAT = "!B B B b 11I"
	"""packet format to pack/unpack"""

	def __init__(self, version=3, mode=3, tx_timestamp=0):
		"""Constructor.

		Parameters:
		version      -- NTP version
		mode         -- packet mode (client, server)
		tx_timestamp -- packet transmit timestamp
		"""
		self.leap = 0
		"""leap second indicator"""
		self.version = version
		"""version"""
		self.mode = mode
		"""mode"""
		self.stratum = 2
		"""stratum"""
		self.poll = 6
		"""poll interval"""
		self.precision = -6
		"""precision"""
		self.root_delay = 0
		"""root delay"""
		self.root_dispersion = 0
		"""root dispersion"""
		self.ref_id = 0
		"""reference clock identifier"""
		self.ref_timestamp = 0
		"""reference timestamp"""
		self.orig_timestamp = 0
		"""originate timestamp"""
		self.orig_timestamp_h = 0
		self.orig_timestamp_l = 0
		
		self.recv_timestamp = 0
		"""receive timestamp"""
		self.tx_timestamp = tx_timestamp
		"""tansmit timestamp"""
		self.tx_timestamp_h = 0
		self.tx_timestamp_l = 0
		
	def to_data(self):
		"""Convert this NTPPacket to a buffer that can be sent over a socket.

		Returns:
		buffer representing this packet

		Raises:
		NTPException -- in case of invalid field
		"""
		try:
			packed = struct.pack(NTPPacket._PACKET_FORMAT,
				(self.leap << 6 | self.version << 3 | self.mode),
				self.stratum,
				self.poll,
				self.precision,
				_to_int(self.root_delay) << 16 | _to_frac(self.root_delay, 16),
				_to_int(self.root_dispersion) << 16 |
				_to_frac(self.root_dispersion, 16),
				self.ref_id,
				_to_int(self.ref_timestamp),
				_to_frac(self.ref_timestamp),
				self.orig_timestamp_h,
				self.orig_timestamp_l,
				_to_int(self.recv_timestamp),
				_to_frac(self.recv_timestamp),
				_to_int(self.tx_timestamp),
				_to_frac(self.tx_timestamp))
		except struct.error:
			raise NTPException("Invalid NTP packet fields.")
		return packed

	def from_data(self, data):
		"""Populate this instance from a NTP packet payload received from
		the network.

		Parameters:
		data -- buffer payload

		Raises:
		NTPException -- in case of invalid packet format
		"""
		try:
			unpacked = struct.unpack(NTPPacket._PACKET_FORMAT,
					data[0:struct.calcsize(NTPPacket._PACKET_FORMAT)])
		except struct.error:
			raise NTPException("Invalid NTP packet.")
		self.leap = unpacked[0] >> 6 
		self.version = unpacked[0] >> 3 & 0x7
		self.mode = unpacked[0] & 0x7
		self.stratum = unpacked[1]
		self.poll = unpacked[2]
		self.precision = unpacked[3]
		self.root_delay = float(unpacked[4])/2**16
		self.root_dispersion = float(unpacked[5])/2**16
		self.ref_id = unpacked[6]
		self.ref_timestamp = _to_time(unpacked[7], unpacked[8])
		self.orig_timestamp = _to_time(unpacked[9], unpacked[10])
		self.recv_timestamp = _to_time(unpacked[11], unpacked[12])
		self.tx_timestamp = _to_time(unpacked[13], unpacked[14])
		self.tx_timestamp_h = unpacked[13]
		self.tx_timestamp_l = unpacked[14]

class RecvThread(threading.Thread):
	def __init__(self,socket):
		threading.Thread.__init__(self)
		self.socket = socket
	def run(self):
		global taskQueue,stopFlag
		while True:
			if stopFlag == True:
				break
			rlist,wlist,elist = select.select([self.socket],[],[],1);
			if len(rlist) != 0:
				for tempSocket in rlist:
					try:
						data,addr = tempSocket.recvfrom(1024)
						recvTimestamp = system_to_ntp_time(time.time())
						taskQueue.put((data,addr,recvTimestamp))
						wx.LogMessage('Received a packet from %s:%d' % (addr[0],addr[1]))
					# except 為空,忽略所有錯誤
					except  :
						continue

class WorkThread(threading.Thread):
	def __init__(self,socket):
		threading.Thread.__init__(self)
		self.socket = socket
	def run(self):
		global taskQueue,stopFlag,stratum,utc_offset,poll_interval,retry_cnt,syntime
		while True:
			if stopFlag == True:
				break
			try:
				data,addr,recvTimestamp = taskQueue.get(timeout=1)
				if (data == 'synflag'):
					sendPacket = NTPPacket(version=version)
					sendPacket.stratum = 8
					sendPacket.poll = poll_interval
					sendPacket.ref_timestamp = NTP.ref_timestamp
					sendPacket.orig_timestamp_h = NTP.orig_timestamp_h
					sendPacket.orig_timestamp_l = NTP.orig_timestamp_l
					sendPacket.recv_timestamp = NTP.recv_timestamp
					sendPacket.ref_id = NTP.ref_id
					sendPacket.tx_timestamp = system_to_ntp_time(time.time())
					self.socket.sendto(sendPacket.to_data(),addr)
					NTP.tx_timestamp = sendPacket.tx_timestamp
					wx.LogMessage('Sended a request to %s:%d' % (addr[0],addr[1]))
					
				else :
					recvPacket = NTPPacket()
					recvPacket.from_data(data)
					if (recvPacket.mode == 3):
						if syntime != 1 or NTP.ref_timestamp ==0 or retry_cnt == 0 :
							NTP.ref_timestamp = system_to_ntp_time(time.time()-30)
							NTP.ref_id = iptoint('127.0.0.1')
						sendPacket = NTPPacket(version=recvPacket.version,mode=4)
						sendPacket.stratum = stratum
						sendPacket.ref_id = NTP.ref_id
						sendPacket.ref_timestamp = NTP.ref_timestamp + utc_offset*3600
						sendPacket.orig_timestamp_h = recvPacket.tx_timestamp_h
						sendPacket.orig_timestamp_l = recvPacket.tx_timestamp_l
						sendPacket.recv_timestamp = recvTimestamp+utc_offset*3600
						sendPacket.tx_timestamp = system_to_ntp_time(time.time()+utc_offset*3600)
						self.socket.sendto(sendPacket.to_data(),addr)
						wx.LogMessage('Sended a response to %s:%d' % (addr[0],addr[1]))
					elif (recvPacket.mode == 4):
						retry_cnt = 5
						#計算時差
						delta = (recvPacket.recv_timestamp + recvPacket.tx_timestamp - recvPacket.orig_timestamp - recvTimestamp) / 2
						#計算報文往返時延
						latency = (recvTimestamp - recvPacket.orig_timestamp) - (recvPacket.tx_timestamp - recvPacket.recv_timestamp)
						
						if ( latency < 0.4 ):
							if (abs(delta) > pow(2,-6)):
								current_timeStamp = time.time() + delta
								setSystemTime(current_timeStamp)
								NTP.ref_timestamp = system_to_ntp_time(current_timeStamp)
								NTP.orig_timestamp_h = recvPacket.tx_timestamp_h
								NTP.orig_timestamp_l = recvPacket.tx_timestamp_l
								NTP.recv_timestamp = recvTimestamp
								NTP.ref_id = iptoint(serverip)
								wx.LogMessage('Need to change,offset:'+str(delta*1000)+' ms')
								wx.LogMessage(u'Local Clock is synchronized  O(∩_∩)O 哈哈~!!')
							else :
								if NTP.ref_timestamp == 0 :
									NTP.ref_timestamp = system_to_ntp_time(time.time())
									NTP.ref_id = iptoint(serverip)
								NTP.orig_timestamp_h = recvPacket.tx_timestamp_h
								NTP.orig_timestamp_l = recvPacket.tx_timestamp_l
								NTP.recv_timestamp = recvTimestamp
								wx.LogMessage(u'Don\'t need to change,offset:' +str(delta*1000)+' ms')
								wx.LogMessage(u'Local Clock is synchronized  O(∩_∩)O 哈哈~!!')
							
						else :
							wx.LogMessage(u'本次報文往返時延太大,忽略!時延:'+ str(latency*1000) + ' ms')
			except  :
				continue

def iptoint(ipaddr):
	return sum([256**j*int(i) for j,i in enumerate(ipaddr.split('.')[::-1])])
	
class SynTimeThread(threading.Thread):
	def __init__(self):
		threading.Thread.__init__(self)
	def run(self):
		global taskQueue,stopFlag,poll_interval,retry_cnt
		while True:
			if stopFlag == True:
				break
			if (retry_cnt == 0):
				wx.LogMessage('Local Clock is unsynchronized !! please check the ntp server!!!')
				NTP.ref_id = 0
				# NTP.ref_timestamp = 0
				NTP.orig_timestamp = 0
				NTP.recv_timestamp = 0
				taskQueue.put(('synflag',(serverip,serverport),NTP.ref_timestamp))
				time.sleep(pow(2,poll_interval))
			else :
				retry_cnt -=1
				taskQueue.put(('synflag',(serverip,serverport),NTP.ref_timestamp))
				time.sleep(pow(2,poll_interval))
				
				
def ntpserverstart():
	import os
	#檢查w32time服務是否開啟,如果開啟了,需要關閉,以便釋放123埠的監聽
	szResult = os.popen('sc query w32time')
	szResult = szResult.read()
	if 'RUNNING' in szResult:
		os.popen('sc stop w32time')
	del os
	
	#載入配置檔案
	loadsettingsjsonfile()
	
	listenIp = '0.0.0.0'
	listenPort = 123
	mysocket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
	mysocket.bind((listenIp,listenPort))
	wx.LogMessage('ntp server is started!!')
	wx.LogMessage('ntp server version is: '+ntpserverversion)
	wx.LogMessage('ntp server is design by flycony')
	wx.LogMessage('local listen addr: %s:%s' % mysocket.getsockname() )
	#Thread-1
	workThread = WorkThread(mysocket)
	workThread.start()
	#Thread-2
	recvThread = RecvThread(mysocket)
	recvThread.start()
	if (syntime == 1) :
		#Thread-3
		syntimeThread = SynTimeThread()
		syntimeThread.start()

def ntpserverstop():
	global stopFlag
	stopFlag = True

class TaskBarIcon(wx.TaskBarIcon):
	ID_About = wx.NewId()
	ID_Minshow=wx.NewId()
	ID_Maxshow=wx.NewId()
	ID_Closeshow=wx.NewId()
	
	def __init__(self, frame):
		wx.TaskBarIcon.__init__(self)
		self.frame = frame
		self.SetIcon(wx.Icon(name='profile/icon/ntp.ico', type=wx.BITMAP_TYPE_ICO), 'ntp server')  #wx.ico為ico圖示檔案
		#self.Bind(wx.EVT_TASKBAR_LEFT_DCLICK, self.OnTaskBarLeftDClick) #定義左鍵雙擊
		self.Bind(wx.EVT_TASKBAR_LEFT_DOWN, self.OnTaskBarLeftDClick) 
		self.Bind(wx.EVT_MENU, self.OnAbout, id=self.ID_About)
		self.Bind(wx.EVT_MENU, self.OnMinshow, id=self.ID_Minshow)
		self.Bind(wx.EVT_MENU, self.OnMaxshow, id=self.ID_Maxshow)
		self.Bind(wx.EVT_MENU, self.OnCloseshow, id=self.ID_Closeshow)

	def OnTaskBarLeftDClick(self, event):
		if self.frame.IsIconized():
			self.frame.Iconize(False)
			self.frame.Show(True)
			self.frame.Raise()
		else :
			self.frame.Iconize(True)
			
	def OnAbout(self,event):
		wx.MessageBox(u'這是一個迷你的ntp客戶端和伺服器,可以設定UTC時間的偏移量!', u'關於')

	def OnMinshow(self,event):
		self.frame.Iconize(True)
		
	def OnMaxshow(self,event):
		if self.frame.IsIconized():
			self.frame.Iconize(False)
		if not self.frame.IsShown():
			self.frame.Show(True)
		self.frame.Raise()
		self.frame.Maximize(True) #最大化顯示

	def OnCloseshow(self,event):
		wx.Log.Suspend()
		ntpserverstop()
		self.frame.Close(True)
		
		
	# 右鍵選單
	def CreatePopupMenu(self):
		menu = wx.Menu()
		#menu.Append(self.ID_Play, u'演示')
		menu.Append(self.ID_Minshow, u'最小化')
		menu.Append(self.ID_Maxshow, u'最大化')
		menu.Append(self.ID_About, u'關於')
		menu.Append(self.ID_Closeshow, u'退出')
		return menu

class Frame(wx.Frame):
	def __init__(
			self, parent=None, id=wx.ID_ANY, title='ntp server '+ntpserverversion, pos=wx.DefaultPosition,
			size=wx.DefaultSize, style=wx.DEFAULT_FRAME_STYLE
			):
		wx.Frame.__init__(self, parent, id, title, pos, size, style) 
		self.textctrl = wx.TextCtrl(self, style=wx.TE_MULTILINE)		
		self.textctrl.SetMaxLength(30000)
		self.SetIcon(wx.Icon('profile/icon/ntp.ico', wx.BITMAP_TYPE_ICO))
		self.taskBarIcon = TaskBarIcon(self) 
		# 繫結事件
		self.Bind(wx.EVT_CLOSE, self.OnClose)
		self.Bind(wx.EVT_ICONIZE, self.OnIconfiy) 
		# 視窗最小化時,呼叫OnIconfiy,注意Wx窗體上的最小化按鈕,觸發的事件是 wx.EVT_ICONIZE,而根本就沒有定義什麼wx.EVT_MINIMIZE,但是最大化,有個wx.EVT_MAXIMIZE。
		self.Bind(wx.EVT_TEXT_MAXLEN,self.OnClear)
	def OnClear(self,event):
		self.textctrl.Clear()
	def OnHide(self, event):
		self.Hide()
	def OnIconfiy(self, event):
		self.Hide()
		event.Skip()
	def OnClose(self, event):
		wx.Log.Suspend()
		ntpserverstop()
		self.taskBarIcon.Destroy()
		self.Destroy()
	def write(self, s):
		self.textctrl.AppendText(s)

		
def Main():
	app = wx.App()
	frame = Frame(size=(640, 480))
	wx.Log.SetActiveTarget(wx.LogTextCtrl(frame.textctrl))
	wx.Log.SetTimestamp('%Y-%m-%d %H:%M:%S')
	frame.Centre()
	frame.Show()
	ntpserverstart()
	app.MainLoop()
	
	
if __name__ == '__main__':
	Main()

編譯 setup.py

#-*- coding: UTF-8 -*-
from distutils.core import setup
import py2exe

includes = ["encodings", "encodings.*"]     
#要包含的其它庫檔案  
  
options = {
	"py2exe":  
		{"compressed": 1, #壓縮  
		 "optimize": 2,  
		 "includes":includes,  
		 "dll_excludes": ["MSVCP90.dll"],
		 "bundle_files": 1  #所有檔案打包成一個exe檔案
		 }
	}  
data_files=[
			("profile/icon",["profile/icon/ntp.ico"]),
			("profile",["profile/settings.json"])
			]
  
setup(
	options = options,
	zipfile=None,#不生成library.zip檔案 
	data_files=data_files,
	windows=[
				{"script": "ntp.py",
				#exe檔案的圖示 
				"icon_resources": [(1, "profile/icon/ntp.ico")]  
				}
			]
	)

配置檔案 settings.json

{
	"utc_offset":8,
	"stratum":2,
	"syntime":1,
	"serverip":"10.30.1.105",
	"serverport":123,
	"poll_interval":6,
	"version":3
}