import unittest import time, sys, random from socket import IPPROTO_TCP import signal from pcs.packets.localhost import * from pcs.packets.ethernet import * from pcs.packets.ipv4 import ipv4 from pcs.packets.tcp import tcp from pcs import * if __name__ == '__main__': if "-l" in sys.argv: sys.path.insert(0, "../") # Look locally first sys.argv.remove("-l") # Needed because unittest has issues alarm_triggered = 0 def alarm_handler(signal, frame): print "alarm triggered" alarm_triggered = 1 def set_alarm(seconds): alarm_triggered = 0 signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(seconds) def disable_alarm(): signal.alarm(0) def setup_ip(src, dst): ip = ipv4() ip.version = 4 ip.hlen = 5 ip.tos = 0 ip.id = 13000 ip.flags = 0 ip.offset = 0 ip.ttl = 64 ip.checksum = 0 ip.protocol = IPPROTO_TCP ip.src = src ip.dst = dst return ip def setup_tcp_syn(sport, dport): tcppacket = tcp() tcppacket.sport = sport tcppacket.dport = dport tcppacket.sequence = 42 tcppacket.offset = 5; tcppacket.syn = 1 tcppacket.push = 0 tcppacket.window = 65535 tcppacket.checksum = 0 return tcppacket def setup_tcp_ack(sport, dport, ackno): tcppacket = tcp() tcppacket.sport = sport tcppacket.dport = dport tcppacket.sequence = 43 tcppacket.offset = 5; tcppacket.ack = 1 tcppacket.ack_number = ackno tcppacket.window = 65535 tcppacket.checksum = 0 return tcppacket def setup_tcp_cksum(ippkt, tcppkt): ippkt.length = len(ippkt.getbytes()) + len(tcppkt.getbytes()) ippkt.checksum = ippkt.cksum() tcppkt.checksum = tcppkt.cksum(ippkt) def setup_ether(src, dst): ether = ethernet() ether.src = src ether.dst = dst ether.type = 2048 return ether def write_tcp_ip_ether_pkt(output, packet, tcppkt, ippkt, etherpkt): return output.write(packet.bytes, len(etherpkt.getbytes()) + len(ippkt.getbytes()) + len(tcppkt.getbytes())) def get_dut_port(dut_ip, server_port): sockobj = socket(AF_INET, SOCK_STREAM) sockobj.connect((dut_ip, server_port)) portno = sockobj.recv(8) print "dut port ", portno return (sockobj, int(portno)) def get_source_port(host, start_portno): # :-/ - the stack will send RSTs on bound sockets that aren't listening sockobj = socket(AF_INET, SOCK_STREAM) portno = start_portno + random.randrange(1, 1000) return (sockobj, portno) print "trying for source port" while 1: try: sockobj.bind((host, start_portno)) except: print "bumping start_portno ", start_portno start_portno += 1 else: break print "source port is ", start_portno return (sockobj, start_portno) def check_timeout(start, timeout): now = time.time() if now - timeout > start: return 1 return 0 class tcpv4_test: """A Class for testing TCP for IPv4""" def __init__(self, src, sport, dst, dport, src_ether, dut_ether, device): self.src = inet_atol(src) self.src_addr = src self.sport = sport self.dut_ip = inet_atol(dst) self.dut_addr = dst; self.dut_server_port = dport self.src_ether = src_ether self.dut_ether = dut_ether self.device = device # send a SYN over loopback def test_tcpv4_syn_local(self): src = inet_atol("127.0.0.1") dst = inet_atol("127.0.0.1") print "testing local" ip = ipv4() ip.version = 4 ip.hlen = 5 ip.tos = 0 ip.id = 13000 ip.flags = 0 ip.offset = 0 ip.ttl = 64 ip.checksum = 0 ip.protocol = IPPROTO_TCP ip.src = src ip.dst = dst tcppacket = tcp() tcppacket.sport = int(self.sport) tcppacket.dport = int(self.dut_server_port) tcppacket.sequence = 42 tcppacket.offset = 5; tcppacket.syn = 1 tcppacket.push = 0 tcppacket.window = 4096 tcppacket.checksum = 0 lo = localhost() lo.type = 2 ip.length = len(ip.getbytes()) + len(tcppacket.getbytes()) ip.checksum = ip.cksum() tcppacket.checksum = tcppacket.cksum(ip) packet = Chain([lo, ip, tcppacket]) packet.encode() #print packet input = PcapConnector("lo0") input.setfilter("tcp"); output = PcapConnector("lo0") #out = output.write(packet.bytes, len(lo.getbytes()) + len(ip.getbytes()) + len(tcppacket.getbytes())) #print "wrote %d bytes"%out # send a syn over to a remote host over ethernet # - test the syncache timer by confirming that we # get 4 SYNs back in less than 60s def test_tcpv4_syn_remote(self): print "testing remote" (remote_sockobj, dut_port) = get_dut_port(self.dut_addr, self.dut_server_port) (local_sockobj, sport) = get_source_port(self.src_addr, self.sport) ippkt = setup_ip(self.src, self.dut_ip) tcppkt = setup_tcp_syn(sport, dut_port) setup_tcp_cksum(ippkt, tcppkt) etherpkt = setup_ether(self.src_ether, self.dut_ether) packet = Chain([etherpkt, ippkt, tcppkt]) packet.encode() instream = PcapConnector(self.device) instream.setfilter("tcp"); output = PcapConnector(self.device) out = write_tcp_ip_ether_pkt(output, packet, tcppkt, ippkt, etherpkt) syncount = 0 start_time = time.time() while syncount < 4: newpkt = instream.readpkt() if check_timeout(start_time, 60): breakk try: newpkt = newpkt.data.data except: continue if newpkt.dport == tcppkt.sport and newpkt.ack_number == (tcppkt.sequence + 1): print "matching SYN" syncount += 1 if syncount >= 4: print "syncache test passed" else: print "syncache test failed" remote_sockobj.close() local_sockobj.close() # Syncookie test # # Send a SYN # Record the seq # in the SYN,ACK # Send a RST, so that the syncache drops the connection # Then send the ACK, that should hit the syncookie code # Run many times, with different time offsets; the cookies # should only be valid for a limited time. # 1.19 # TEST_DESCRIPTION # TCP MUST send a FIN on a CLOSE call in ESTABLISHED, SYN-RCVD or # CLOSE-WAIT state # M> TEST_REFERENCE # RFC 793 s3.2 p23 Terminology # M> TEST_METHOD # - ANVL: Cause the DUT to move on to state # - ANVL: Cause the application on the DUT-side to issue a CLOSE call # - DUT: Send a FIN # - CASE: = ESTABLISHED # - CASE: = SYN-RCVD # - CASE: = CLOSE-WAIT # establish control connection to test app # # move test app over test connection into -> {SYN-RCVD, ESTABLISHED, CLOSE-WAIT} # tell test app to issue close on test connection # check that a FIN is received def test_tcpv4_1_19_syn_rcvd(self): print "executing 1.19" (sockobj, dut_port) = get_dut_port(self.dut_addr, self.dut_server_port) (local_sockobj, sport) = get_source_port(self.src_addr, self.sport) # setup input / output to ethernet card output = PcapConnector(self.device) instream = PcapConnector(self.device) instream.setfilter("tcp"); # send initial syn ippkt = setup_ip(self.src, self.dut_ip) tcppkt = setup_tcp_syn(sport, dut_port) setup_tcp_cksum(ippkt, tcppkt) etherpkt = setup_ether(self.src_ether, self.dut_ether) packet = Chain([etherpkt, ippkt, tcppkt]) packet.encode() out = write_tcp_ip_ether_pkt(output, packet, tcppkt, ippkt, etherpkt) # wait up to 60s receive syn back start_time = time.time() while 1: newpkt = instream.readpkt() if check_timeout(start_time, 60): return 1 try: newpkt = newpkt.data.data except: continue if newpkt.dport == tcppkt.sport and newpkt.ack_number == (tcppkt.sequence + 1) and newpkt.syn == 1: break print "got SYN" # DUT is in SYN_RCVD - tell DUT server thread to close test connection sockobj.send("close_exit") # wait up to 60s for the FIN start_time = time.time() while 1: newpkt = instream.readpkt() if check_timeout(start_time, 60): break try: newpkt = newpkt.data.data except: continue if newpkt.dport == tcppkt.sport and newpkt.ack_number == (tcppkt.sequence + 1) and newpkt.syn == 0: if newpkt.fin == 1: print "1_19 success" elif newpkt.reset == 1: print "got RST ... hrrrm" return 0 print "1_19_syn_rcvd failed" def test_tcpv4_1_19_established(self): print "executing 1.19" (sockobj, dut_port) = get_dut_port(self.dut_addr, self.dut_server_port) (local_sockobj, sport) = get_source_port(self.src_addr, self.sport) # setup input / output to ethernet card output = PcapConnector(self.device) instream = PcapConnector(self.device) instream.setfilter("tcp"); # send initial syn ippkt = setup_ip(self.src, self.dut_ip) tcppkt = setup_tcp_syn(sport, dut_port) setup_tcp_cksum(ippkt, tcppkt) etherpkt = setup_ether(self.src_ether, self.dut_ether) packet = Chain([etherpkt, ippkt, tcppkt]) packet.encode() out = write_tcp_ip_ether_pkt(output, packet, tcppkt, ippkt, etherpkt) # wait up to 60s receive syn back start_time = time.time() while 1: newpkt = instream.readpkt() if check_timeout(start_time, 60): return 1 try: newpkt = newpkt.data.data except: continue if newpkt.dport == tcppkt.sport and newpkt.ack_number == (tcppkt.sequence + 1) and newpkt.syn == 1: break print "got SYN" # send ack of syn ippkt = setup_ip(self.src, self.dut_ip) tcppkt = setup_tcp_ack(sport, dut_port, newpkt.sequence + 1) setup_tcp_cksum(ippkt, tcppkt) etherpkt = setup_ether(self.src_ether, self.dut_ether) packet = Chain([etherpkt, ippkt, tcppkt]) packet.encode() out = write_tcp_ip_ether_pkt(output, packet, tcppkt, ippkt, etherpkt) print packet print "sent ack" time.sleep(1) # DUT is in SYN_RCVD - tell DUT server thread to close test connection sockobj.send("close_exit") # wait up to 60s for the FIN start_time = time.time() while 1: newpkt = instream.readpkt() if check_timeout(start_time, 60): break try: newpkt = newpkt.data.data except: continue if newpkt.dport == tcppkt.sport and newpkt.ack_number == (tcppkt.sequence + 1) and newpkt.syn == 0: if newpkt.fin == 1: print "1_19 success" elif newpkt.reset == 1: print "got RST ... hrrrm" return 0 print "1_19 failed" # # 1.23 #M> TEST_DESCRIPTION # TCP MUST move on to CLOSED state from TIME-WAIT state after a timeout # of 2*MSL, where TIME-WAIT is reached through CLOSING state # M> TEST_REFERENCE # RFC 793 s3.2 p23 Terminology # M> TEST_METHOD # - ANVL: Cause the DUT to move on to CLOSING state # - ANVL: Send ACK for the FIN just received from the DUT # - ANVL: Send a FIN after 2*MSL + 20%% # - DUT: Send a RST segment(this will indicate DUT is in CLOSED # state) # # validate recycling # # - Open control connection to test app # - Open test connection to test app # - Tell app to close test connection # - ack FIN # to test TIME_WAIT # - send FIN every 10s # - check that FIN is either ignored or acked # - wait 2.4*MSL then send FIN # - check that an RST is received # 9.21 # M> TEST_DESCRIPTION # TCP, in CLOSING, LAST-ACK or TIME-WAIT state, MUST return to # CLOSED state on receiving a RST # M> TEST_REFERENCE # RFC 793 s3.9 p70 Event Processing # M> TEST_METHOD # - ANVL: Cause the DUT to move on to state # - ANVL: Send a RST segment # - DUT: Goes to CLOSED state # - ANVL: Verify that the DUT moves on to CLOSED state # - CASE: = CLOSING # - CASE: = LAST-ACK # - CASE: = TIME-WAIT # # - establish control connection to test app # # - tear down connection through {CLOSING, TIME_WAIT, LAST_ACK} # - send RST to DUT # 11.22 # TCP MAY accept a new SYN from the remote TCP in TIME-WAIT state # M> TEST_REFERENCE # RFC 1122 s4.2.2.13 p88 Closing a connection: RFC-793 Section 3.5 # M> TEST_METHOD # - ANVL: Cause the DUT to move on to TIME-WAIT state through # state # - ANVL: Send a SYN on the same port # - DUT: Send SYN,ACK # - ANVL: Send ACK and verify that the DUT moves on to # ESTABLISHED state # - CASE: = FINWAIT-2 # - CASE: = CLOSING # # - establish control connection # - establish test connection # - tell DUT to close connection # - move DUT to TIME-WAIT through CLOSING by sending FIN then ACK # - move DUT to TIME-WAIT through FIN_WAIT by send ACK then FIN # 11.29 # TCP, in CLOSE-WAIT state MUST inform the application # in case of aborting from remote site # M> TEST_REFERENCE # RFC 1122 s4.2.2.13 p87 Closing a connection: RFC-793 Section 3.5 # M> TEST_METHOD # - DUT: Move on to CLOSE-WAIT state # - ANVL: Cause the application on the DUT-side to issue a RECEIVE call # - ANVL: Send a RST # - ANVL: Verify that the receiving application receives the signal # that "connection reset" has occurred from the remote site # # # - establish control connection # # - establish test connection # - tell DUT to close its side of the connection # - tell DUT to issue receive call # - send RST to DUT # - check that receive call returns ECONNRESET # 16.17 # # A TCP MAY keep its offered receive window closed indefinitely # M> TEST_REFERENCE # RFC 1122 s4.2.2.17 p92 Probing Zero Windows: RFC-793 Section 3.7, # page 42 # M> TEST_METHOD # - ANVL: Cause DUT to move on to ESTABLISHED state # - ANVL: Send enough data segments to fill the receiver window of DUT # where the receiving application is not asked to extract any # data # - DUT: Send ACKs with the last window update shown to be zero # - ANVL: Verify that even after a long time (2*MSL) the DUT remains # in the ESTABLISHED state # # - establish control connection # # - establish test connection with connecting side using sockets API this time # - send a window's worth of data # - have server send ACKs with window update set to be zero # - have client probe for window size macbook = ('192.168.1.4', "\x00\x1b\x63\xc6\x30\x35") storage = ('192.168.1.160', "\x00\x07\xe9\x03\xad\x88") class tcpv4TestCase(unittest.TestCase): def test_simple_syn(self): #(dut_ip, dut_ether) = macbook (dut_ip, dut_ether) = storage test = tcpv4_test("192.168.1.151", 54000, dut_ip, 45678, "\x00\x17\x31\xef\x61\xda", dut_ether, "em0") # test.test_tcpv4_syn_local() # test.test_tcpv4_syn_remote() # test.test_tcpv4_1_19_syn_rcvd() test.test_tcpv4_1_19_established() print "done" if __name__ == '__main__': unittest.main()