python – Simple SYN Port Scan Using Scapy


I thought that I should have a basic grasp of Scapy, so I decided to try writing up a port scanner.

>>> timeit(lambda: print(port_scan("192.168.123.123", range(15000), timeout=5)), number=1)
(80, 11111)
32.23571190000001

I’ve made naïve scanners before that were basically just connecting sockets and seeing if you get errors, but those are “noisy” in that each attempt is more likely to get logged. This version sends a SYN to the server to initiate the TCP handshake, then looks at the response. If it’s a SYN-ACK, something is accepting connections on the port, and if it’s anything else, they aren’t (or at least they aren’t from us, right now).

There isn’t a ton of code here, but this is my first time using Scapy, so I’d appreciate tips on it or anything else. It’s painfully slow, so if the packet construction or sending could be sped up, I’d appreciate that. Normally I’d use a thread pool here, but I have no idea what all Scapy is doing behind the scenes, so I haven’t tried that yet. I’m trying to used a “cached” ip_layer in new_scan_packets as well, but that doesn’t seem to have helped.

from typing import Collection, List
from random import randint, shuffle
from scapy.layers.inet import IP, TCP
from scapy.sendrecv import sr

SEQ_MAX = 2**32 - 1
EPHEMERAL_RANGE = (2**14 + 2**15, 2**16 - 1)  # According to the IANA

SYN_FLAG = "S"
SYN_ACK_FLAG = SYN_FLAG + "A"
ALL_PORTS = range(EPHEMERAL_RANGE(1) + 1)
DEFAULT_TIMEOUT = 3


def new_scan_packets(address: str, ports: Collection(int)) -> List(TCP):
    ip_layer = IP(dst=address)
    return (ip_layer / TCP(sport=randint(*EPHEMERAL_RANGE), dport=port, seq=randint(0, SEQ_MAX - 1), flags=SYN_FLAG)
            for port in ports)


def port_scan(address: str, ports: Collection(int), shuffled: bool = True, **kwargs) -> List(int):
    """
    Scans the ports in the given collection and finds which are accepting connections.
    Returns a list of ports that were found to be open.
    kwargs are passed directly to sr to support options like "delay" and "timeout".
    """
    kwargs.setdefault("timeout", DEFAULT_TIMEOUT)  # Because the scan could hang indefinitely otherwise
    syns = new_scan_packets(address, ports)

    if shuffled:
        shuffle(syns)

    answered, _ = sr(syns, verbose=False, **kwargs)
    return sorted(stimulus(TCP).dport
                  for stimulus, response in answered
                  if response(TCP).flags.flagrepr() == SYN_ACK_FLAG)