aboutsummaryrefslogtreecommitdiff
path: root/ethsteal/ping.py
blob: 3eaa77f8c821d2d6ca30efd64fd711aa6104fdd4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python

"""
    Original source: https://raw.githubusercontent.com/samuel/python-ping/master/ping.py
"""

import os, sys, socket, struct, select, time, string, random


class ICMP():
	ICMP_ECHO_REQUEST = 8
	def __init__(self):
		self.ident = os.getpid() & 0xFFFF
		icmp = socket.getprotobyname("icmp")
		try:
			self.icmp_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
		except socket.error:
			if socket.errno == EACCES:
				# Operation not permitted
				msg = socket.strerror + (
					" - Note that ICMP messages can only be sent from processes"
					" running as root."
				)
				raise socket.error(msg)
			raise # raise the original error

	def __del__(self):
		self.icmp_sock.close()
		del self.icmp_sock, self.ident

	def checksum(self, source_string):
		sum = 0
		countTo = (len(source_string)/2)*2
		count = 0
		while count<countTo:
			thisVal = ord(source_string[count + 1])*256 + ord(source_string[count])
			sum = sum + thisVal
			sum = sum & 0xffffffff # Necessary?
			count = count + 2

		if countTo<len(source_string):
			sum = sum + ord(source_string[len(source_string) - 1])
			sum = sum & 0xffffffff # Necessary?

		sum = (sum >> 16)  +  (sum & 0xffff)
		sum = sum + (sum >> 16)
		answer = ~sum
		answer = answer & 0xffff

		# Swap bytes. Bugger me if I know why.
		answer = answer >> 8 | (answer << 8 & 0xff00)

		return answer

	def receive_one_ping(self, ID, timeout):
		timeLeft = timeout
		while True:
			startedSelect = time.time()
			whatReady = select.select([self.icmp_sock], [], [], timeLeft)
			howLongInSelect = (time.time() - startedSelect)
			if whatReady[0] == []: # Timeout
				return

			timeReceived = time.time()
			recPacket, addr = self.icmp_sock.recvfrom(1024)
			icmpHeader = recPacket[20:28]
			type, code, checksum, packetID, sequence = struct.unpack("bbHHh", icmpHeader)
			# Filters out the echo request itself. 
			# This can be tested by pinging 127.0.0.1 
			# You'll see your own request
			if type != 8 and packetID == ID:
				bytesInDouble = struct.calcsize("d")
				timeSent = struct.unpack("d", recPacket[28:28 + bytesInDouble])[0]
				return timeReceived - timeSent

			timeLeft = timeLeft - howLongInSelect
			if timeLeft <= 0:
				return

	def send_one_ping(self, dest_addr, ID):
		"""
		Send one ping to the given >dest_addr<.
		"""
		dest_addr  =  socket.gethostbyname(dest_addr)
		# Header is type (8), code (8), checksum (16), id (16), sequence (16)
		my_checksum = 0
		# Make a dummy heder with a 0 checksum.
		header = struct.pack("bbHHh", self.ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
		bytesInDouble = struct.calcsize("d")
		data = (192 - bytesInDouble) * random.choice(string.letters)
		data = struct.pack("d", time.time()) + data
		# Calculate the checksum on the data and the dummy header.
		my_checksum = self.checksum(header + data)
		# Now that we have the right checksum, we put that in. It's just easier
		# to make up a new header than to stuff it into the dummy.
		header = struct.pack("bbHHh", self.ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1)
		packet = header + data
		self.icmp_sock.sendto(packet, (dest_addr, 1)) # Don't know about the 1
	
	def do_one(self, dest_addr, timeout):
		"""
		Returns either the delay (in seconds) or none on timeout.
		"""
		self.send_one_ping(dest_addr, self.ident)
		delay = self.receive_one_ping(self.ident, timeout)
		return delay

	def verbose_ping(self, dest_addr, timeout = 2, count = 4):
		"""
		Send >count< ping to >dest_addr< with the given >timeout< and display
		the result.
		"""
		for i in xrange(count):
			print('ping %s...' % dest_addr,)
			try:
				delay  =  self.do_one(dest_addr, timeout)
			except socket.gaierror as e:
				print('failed. (socket error: "%s")' % e[1])
				break

			if delay  ==  None:
				print('failed. (timeout within %ssec.)' % timeout)
			else:
				delay  =  delay * 1000
				print('get ping in %0.4fms' % delay)
		print


if __name__ == '__main__':
	icmp = ICMP()
	icmp.verbose_ping("heise.de")
	icmp.verbose_ping("google.com")
	icmp.verbose_ping("a-test-url-taht-is-not-available.com")
	icmp.verbose_ping("192.168.1.1")
	del icmp