I thought that I should have a basic grasp of Scapy, so I decided to try writing up a port scanner.
>>> timeit(lambda: print(port_scan("192.168.123.123", range(15000), timeout=5)), number=1) (80, 11111) 32.23571190000001
I’ve made naïve scanners before that were basically just connecting sockets and seeing if you get errors, but those are “noisy” in that each attempt is more likely to get logged. This version sends a SYN to the server to initiate the TCP handshake, then looks at the response. If it’s a SYN-ACK, something is accepting connections on the port, and if it’s anything else, they aren’t (or at least they aren’t from us, right now).
There isn’t a ton of code here, but this is my first time using Scapy, so I’d appreciate tips on it or anything else. It’s painfully slow, so if the packet construction or sending could be sped up, I’d appreciate that. Normally I’d use a thread pool here, but I have no idea what all Scapy is doing behind the scenes, so I haven’t tried that yet. I’m trying to used a “cached”
new_scan_packets as well, but that doesn’t seem to have helped.
from typing import Collection, List from random import randint, shuffle from scapy.layers.inet import IP, TCP from scapy.sendrecv import sr SEQ_MAX = 2**32 - 1 EPHEMERAL_RANGE = (2**14 + 2**15, 2**16 - 1) # According to the IANA SYN_FLAG = "S" SYN_ACK_FLAG = SYN_FLAG + "A" ALL_PORTS = range(EPHEMERAL_RANGE(1) + 1) DEFAULT_TIMEOUT = 3 def new_scan_packets(address: str, ports: Collection(int)) -> List(TCP): ip_layer = IP(dst=address) return (ip_layer / TCP(sport=randint(*EPHEMERAL_RANGE), dport=port, seq=randint(0, SEQ_MAX - 1), flags=SYN_FLAG) for port in ports) def port_scan(address: str, ports: Collection(int), shuffled: bool = True, **kwargs) -> List(int): """ Scans the ports in the given collection and finds which are accepting connections. Returns a list of ports that were found to be open. kwargs are passed directly to sr to support options like "delay" and "timeout". """ kwargs.setdefault("timeout", DEFAULT_TIMEOUT) # Because the scan could hang indefinitely otherwise syns = new_scan_packets(address, ports) if shuffled: shuffle(syns) answered, _ = sr(syns, verbose=False, **kwargs) return sorted(stimulus(TCP).dport for stimulus, response in answered if response(TCP).flags.flagrepr() == SYN_ACK_FLAG)