Based on my previous post on how to send WLAN frames via Scapy, see Using Scapy to send WLAN frames, I want to demonstrate how to establish a connection to an unencrypted SSID with a WLAN interface that we control. I highly recommend reading my previous post if you aren’t already familiar with Scapy and Python.
Pleate note: The following commands were made on an Ubuntu 16.04 Linux system with a TP-Link TL-WN727N (v4.1) WLAN USB Stick. This WLAN USB stick is capable of sending ACKs within the required timeout, although the device is used in monitor mode, as long as its real MAC address is used as STA MAC. The scapy package can be installed as python-scapy (Ubuntu) or just scapy (Fedora), wireshark is also required. Your linux and python skills should be quite solid and include knowledge about the use of ifconfig and iwconfig in linux as well as class definition and inheritance in python.
Required packets for a WLAN connection
Without going into too much detail here, we basically just need two types of packets to connect to an AP – an Authentication frame and an Association Request.
Authentication Frame
For an authentication frame, Scapy already offers the required protocols Dot11() and Dot11Auth(). All we need to do is determine a STA MAC and a BSSID. With algo=0, the open system authentication algorithm is used, a sequence number of 1 is chosen by seqnum=0x0001 and the status code 0 means successful. This is the full command for the authentication frame:
packet = Dot11(addr1=[RECEIVER MAC], addr2=[SENDER MAC], addr3=[BSSID]) / Dot11Auth(algo=0, seqnum=0x0001, status=0x0000)
If we take a look at the packet in wireshark, we can see that the given interpretation of the fields fits:
Association Request
This one is a rather difficult frame to create. It requires knowledge about parameters that are present in the Beacon like the supported rates field. This data has to be present within our association request, otherwise a LANCOM AP will reject the association of this client. In the following example we want to connect to an AP that supports .11g and .11n, but no .11b. If you are using a different AP or setting, some other rates/fields might be mandatory as well.
Let’s start with the basic packet that consists of Dot11, Dot11AssoReq and Dot11Elt. Besides the BSSID and STA MAC, the SSID NAME (ESSID) needs to be known as well to create the association request packet. The SSID is written to the info field of an 802.11 information element (Dot11Elt) with ID=0.
packet = Dot11(addr1=[RECEIVER MAC], addr2=[SENDER MAC], addr3=[BSSID])/Dot11AssoReq(cap=0x1100, listen_interval=0x00a) / Dot11Elt(ID=0, info="SSID NAME")
Again we take a look at the packet in wireshark:
As I’ve mentioned before, we also need to add the information field for supported rates, that matches the one from the AP Beacons. We can just sniff the Beacons and copy the values to our own definition of a supported rates information field. How to do it is given in the section Creating a customized packet in my previous post. So I just show you the Python code below.
from scapy.all import * class Dot11EltRates(Packet): """ Our own definition for the supported rates field """ name = "802.11 Rates Information Element" # Our Test STA supports the rates 6, 9, 12, 18, 24, 36, 48 and 54 Mbps supported_rates = [0x0c, 0x12, 0x18, 0x24, 0x30, 0x48, 0x60, 0x6c] fields_desc = [ByteField("ID", 1), ByteField("len", len(supported_rates))] for index, rate in enumerate(supported_rates): fields_desc.append(ByteField("supported_rate{0}".format(index + 1), rate))
Python Files
Receiving and Sending packets
We create a Python file to receive and send packets with our monitoring interface and name it “monitor_ifc.py”. First of all, we add our Rates Info element as a class named “Dott11EltRates” and then create a new class called “Monitor”. The class is initialized with the name of the monitor interface, a STA MAC and the BSSID to connect to. The boolean variables “auth_found” and “assoc_found” are used to check if authentication/association responses were seen. Our Dot11EltRates class is assigned to a dot11_rates variable.
from scapy.all import * class Dot11EltRates(Packet): """ Our own definition for the supported rates field """ name = "802.11 Rates Information Element" # Our Test AP has the rates 6, 9, 12 (B), 18, 24, 36, 48 and 54, with 12 # Mbps as the basic rate - which does not have to concern us. supported_rates = [0x0c, 0x12, 0x18, 0x24, 0x30, 0x48, 0x60, 0x6c] fields_desc = [ ByteField("ID", 1), ByteField("len", len(supported_rates)) ] for index, rate in enumerate(supported_rates): fields_desc.append(ByteField("supported_rate{0}".format( index + 1), rate)) class Monitor: def __init__(self, mon_ifc, sta_mac, bssid): """ :param mon_ifc: WLAN interface to use as a monitor :param channel: Channel to operate on :param sta_mac: MAC address of the STA :param bssid: BSSID of the AP to attack """ self.mon_ifc = mon_ifc self.sta_mac = sta_mac self.bssid = bssid self.auth_found = False self.assoc_found = False self.dot11_rates = Dot11EltRates()
Out first method of the Monitor class is “send_packet”, which simply sends a given Scapy packet. However, if the packet type is an Association Request, we add the dot11_rates informatation prior to sending it.
def send_packet(self, packet, packet_type=None): """ :param packet_type: Specific types require a special handling :param packet: This is our packet to be sent :return: """ # Send out the packet if packet_type is None: send(packet) elif packet_type == "AssoReq": packet /= self.dot11_rates send(packet) else: print("Packet Type '{0}' unknown".format(packet_type))
As we need to check if the AP sends an Authentication Response, the method “search_auth()” uses the sniffing function from Scapy to look for packets matching the lfilter, x is a sniffed packet, as long as no timeout occurs or the stop_filter is True. The method “check_auth()” as stop filter just looks for matching addresses within a given frame, the matching frame type is checked by “x.haslayer(Dot11Auth)” within the lfilter of search_auth(). So if a packet has the Dot11Auth layer, we check if the addresses also match to our defined ones. The parameter mp_queue is a multiprocessing queue to return values from multiprocessed processes, which returns True or False in this case.
def check_auth(self, packet): """ Try to find the Authentication from the AP :param packet: sniffed packet to check for matching authentication """ seen_receiver = packet[Dot11].addr1 seen_sender = packet[Dot11].addr2 seen_bssid = packet[Dot11].addr3 if self.bssid == seen_bssid and \ self.bssid == seen_sender and \ self.sta_mac == seen_receiver: self.auth_found = True print("Detected Authentication from Source {0}".format( seen_bssid)) return self.auth_found def search_auth(self, mp_queue): print("\nScanning max 5 seconds for Authentication " "from BSSID {0}".format(self.bssid)) sniff(iface=self.mon_ifc, lfilter=lambda x: x.haslayer(Dot11Auth), stop_filter=self.check_auth, timeout=5) mp_queue.put(self.auth_found)
As we also need to check for an Association Response, we nearly do the same, except that we check if the packet has the Dot11AssoResp layer.
def check_assoc(self, packet): """ Try to find the Association Response from the AP :param packet: sniffed packet to check for matching association """ seen_receiver = packet[Dot11].addr1 seen_sender = packet[Dot11].addr2 seen_bssid = packet[Dot11].addr3 if self.bssid == seen_bssid and \ self.bssid == seen_sender and \ self.sta_mac == seen_receiver: self.assoc_found = True print("Detected Association Response from Source {0}".format( seen_bssid)) return self.assoc_found def search_assoc_resp(self, mp_queue): print("\nScanning max 5 seconds for Association Response " "from BSSID {0}".format(self.bssid)) sniff(iface=self.mon_ifc, lfilter=lambda x: x.haslayer(Dot11AssoResp), stop_filter=self.check_assoc, timeout=5) mp_queue.put(self.assoc_found)
To wrap things up, here is the complete code of the “monitor_ifc.py”:
from scapy.all import * class Dot11EltRates(Packet): """ Our own definition for the supported rates field """ name = "802.11 Rates Information Element" # Our Test AP has the rates 6, 9, 12 (B), 18, 24, 36, 48 and 54, with 12 # Mbps as the basic rate - which does not have to concern us. supported_rates = [0x0c, 0x12, 0x18, 0x24, 0x30, 0x48, 0x60, 0x6c] fields_desc = [ ByteField("ID", 1), ByteField("len", len(supported_rates)) ] for index, rate in enumerate(supported_rates): fields_desc.append(ByteField("supported_rate{0}".format( index + 1), rate)) class Monitor: def __init__(self, mon_ifc, sta_mac, bssid): """ :param mon_ifc: WLAN interface to use as a monitor :param channel: Channel to operate on :param sta_mac: MAC address of the STA :param bssid: BSSID of the AP to attack """ self.mon_ifc = mon_ifc self.sta_mac = sta_mac self.bssid = bssid self.auth_found = False self.assoc_found = False self.dot11_rates = Dot11EltRates() def send_packet(self, packet, packet_type=None): """ Send and display a packet. :param packet_type: Specific types require :param packet: :return: """ # Send out the packet if packet_type is None: send(packet) elif packet_type == "AssoReq": packet /= self.dot11_rates send(packet) else: print("Packet Type '{0}' unknown".format(packet_type)) def check_auth(self, packet): """ Try to find the Authentication from the AP :param packet: sniffed packet to check for matching authentication """ seen_receiver = packet[Dot11].addr1 seen_sender = packet[Dot11].addr2 seen_bssid = packet[Dot11].addr3 if self.bssid == seen_bssid and \ self.bssid == seen_sender and \ self.sta_mac == seen_receiver: self.auth_found = True print("Detected Authentication from Source {0}".format( seen_bssid)) return self.auth_found def check_assoc(self, packet): """ Try to find the Association Response from the AP :param packet: sniffed packet to check for matching association """ seen_receiver = packet[Dot11].addr1 seen_sender = packet[Dot11].addr2 seen_bssid = packet[Dot11].addr3 if self.bssid == seen_bssid and \ self.bssid == seen_sender and \ self.sta_mac == seen_receiver: self.assoc_found = True print("Detected Association Response from Source {0}".format( seen_bssid)) return self.assoc_found def search_auth(self, mp_queue): print("\nScanning max 5 seconds for Authentication " "from BSSID {0}".format(self.bssid)) sniff(iface=self.mon_ifc, lfilter=lambda x: x.haslayer(Dot11Auth), stop_filter=self.check_auth, timeout=5) mp_queue.put(self.auth_found) def search_assoc_resp(self, mp_queue): print("\nScanning max 5 seconds for Association Response " "from BSSID {0}".format(self.bssid)) sniff(iface=self.mon_ifc, lfilter=lambda x: x.haslayer(Dot11AssoResp), stop_filter=self.check_assoc, timeout=5) mp_queue.put(self.assoc_found)
Establish the connection
So let’s move on to the actual connection establishment. We create a new file called “connection_phase.py”, import multiprocessing, scapy and the Monitor class from the “monitor_ifc.py”. Then we create the class “ConnectionPhase” with a connection state as a string, the monitor interface to be used, the STA MAC and the BSSID.
import multiprocessing from scapy.all import * from monitor_ifc import Monitor class ConnectionPhase: """ Establish a connection to the AP via the following commands """ def __init__(self, monitor_ifc, sta_mac, bssid): self.state = "Not Connected" self.mon_ifc = monitor_ifc self.sta_mac = sta_mac self.bssid = bssid
Our first method creates an authentication packet and uses a list of jobs for multiprocessing. Since we want to send the packet and check if we receive an answer from the AP, we parallelize the sending and receiving jobs via multiprocessing.Process(target=[…],args=(arg1, …)) and add them to the list of jobs which will be started and joined afterwards. If the return value in our message queue is True, we will set the internal state to “Authenticated”.
def send_authentication(self): """ Send an Authentication Request and wait for the Authentication Response. Which works if the user defined Station MAC matches the one of the WLAN ifc itself. :return: - """ packet = Dot11( addr1=self.bssid, addr2=self.sta_mac, addr3=self.bssid) / Dot11Auth( algo=0, seqnum=0x0001, status=0x0000) packet.show() jobs = list() result_queue = multiprocessing.Queue() receive_process = multiprocessing.Process( target=self.mon_ifc.search_auth, args=(result_queue, )) jobs.append(receive_process) send_process = multiprocessing.Process( target=self.mon_ifc.send_packet, args=(packet, )) jobs.append(send_process) for job in jobs: job.start() for job in jobs: job.join() if result_queue.get(): self.state = "Authenticated"
For the Association Request frame most of the code is the same, we first check if our internal state matches “Authenticated” before creating the frame based on the Dot11AssReq from Scapy. The parameter “ssid” will be part of the 802.11 information element that includes the SSID name (ESSID). If we get a True in our message queue, the internal state is set to “Associated”.
def send_assoc_request(self, ssid): """ Send an Association Request and wait for the Association Response. Which works if the user defined Station MAC matches the one of the wlan ifc itself. :param ssid: Name of the SSID (ESSID) :return: - """ if self.state != "Authenticated": print("Wrong connection state for Association Request: {0} " "- should be Authenticated".format(self.state)) return 1 packet = Dot11( addr1=self.bssid, addr2=self.sta_mac, addr3=self.bssid) / Dot11AssoReq( cap=0x1100, listen_interval=0x00a) / Dot11Elt( ID=0, info="{}".format(ssid)) packet.show() jobs = list() result_queue = multiprocessing.Queue() receive_process = multiprocessing.Process( target=self.mon_ifc.search_assoc_resp, args=(result_queue,)) jobs.append(receive_process) send_process = multiprocessing.Process( target=self.mon_ifc.send_packet, args=(packet, "AssoReq", )) jobs.append(send_process) for job in jobs: job.start() for job in jobs: job.join() if result_queue.get(): self.state = "Associated"
Last but not least we need code to establish the connection by calling these methods from the ConnectionPhase class. In this example, a main function within the “connection_phase.py” will do this. We also use fix values for our monitoring interface, STA MAC, BSSID and ESSID, but it is also possible to hand them over as arguments from the command line. Check your “iwconfig” to get the full name of the monitor WLAN interface and “ifconfig” to get its MAC. Furthermore the BSSID and the ESSID have to be known.
def main(): monitor_ifc = "wlx60e327xxyyzz" sta_mac = "60:e3:27:xx:yy:zz" bssid = "00:a0:57:aa:bb:cc" conf.iface = monitor_ifc # MACs are converted to always use lowercase mon_ifc = Monitor(monitor_ifc, sta_mac.lower(), bssid.lower()) connection = ConnectionPhase(mon_ifc, sta_mac, bssid) connection.send_authentication() if connection.state == "Authenticated": print("STA is authenticated to the AP!") else: print("STA is NOT authenticated to the AP!") time.sleep(1) connection.send_assoc_request(ssid="SSID-NAME") if connection.state == "Associated": print("STA is connected to the AP!") else: print("STA is NOT connected to the AP!") if __name__ == "__main__": sys.exit(main())
Our code of “connection_phase.py” is now complete, the code is given below.
import multiprocessing from scapy.all import * from monitor_ifc import Monitor class ConnectionPhase: """ Establish a connection to the AP via the following commands """ def __init__(self, monitor_ifc, sta_mac, bssid): self.state = "Not Connected" self.mon_ifc = monitor_ifc self.sta_mac = sta_mac self.bssid = bssid def send_authentication(self): """ Send an Authentication Request and wait for the Authentication Response. Which works if the user defined Station MAC matches the one of the wlan ifc itself. :return: - """ packet = Dot11( addr1=self.bssid, addr2=self.sta_mac, addr3=self.bssid) / Dot11Auth( algo=0, seqnum=0x0001, status=0x0000) packet.show() jobs = list() result_queue = multiprocessing.Queue() receive_process = multiprocessing.Process( target=self.mon_ifc.search_auth, args=(result_queue, )) jobs.append(receive_process) send_process = multiprocessing.Process( target=self.mon_ifc.send_packet, args=(packet, )) jobs.append(send_process) for job in jobs: job.start() for job in jobs: job.join() if result_queue.get(): self.state = "Authenticated" def send_assoc_request(self, ssid): """ Send an Association Request and wait for the Association Response. Which works if the user defined Station MAC matches the one of the wlan ifc itself. :param ssid: Name of the SSID (ESSID) :return: - """ if self.state != "Authenticated": print("Wrong connection state for Association Request: {0} " "- should be Authenticated".format(self.state)) return 1 packet = Dot11( addr1=self.bssid, addr2=self.sta_mac, addr3=self.bssid) / Dot11AssoReq( cap=0x1100, listen_interval=0x00a) / Dot11Elt( ID=0, info="{}".format(ssid)) packet.show() jobs = list() result_queue = multiprocessing.Queue() receive_process = multiprocessing.Process( target=self.mon_ifc.search_assoc_resp, args=(result_queue,)) jobs.append(receive_process) send_process = multiprocessing.Process( target=self.mon_ifc.send_packet, args=(packet, "AssoReq", )) jobs.append(send_process) for job in jobs: job.start() for job in jobs: job.join() if result_queue.get(): self.state = "Associated" def main(): monitor_ifc = "wlx60e327xxyyzz" sta_mac = "60:e3:27:xx:yy:zz" bssid = "00:a0:57:aa:bb:cc" conf.iface = monitor_ifc # mac configuration per command line arguments, MACs are converted to # always use lowercase mon_ifc = Monitor(monitor_ifc, sta_mac.lower(), bssid.lower()) connection = ConnectionPhase(mon_ifc, sta_mac, bssid) connection.send_authentication() if connection.state == "Authenticated": print("STA is authenticated to the AP!") else: print("STA is NOT authenticated to the AP!") time.sleep(1) connection.send_assoc_request(ssid="SSID-NAME") if connection.state == "Associated": print("STA is connected to the AP!") else: print("STA is NOT connected to the AP!") if __name__ == "__main__": sys.exit(main())
We can now execute the code in a Linux bash via “sudo python3 connection_phase.py”, the sudo rights are required to send/receive with our monitor interface. Given below is the output of our tool, with the made up MACs for security reasons.
###[ 802.11 ]### subtype = 11 type = Management proto = 0 FCfield = ID = 0 addr1 = 00:a0:57:aa:bb:cc addr2 = 60:e3:27:xx:yy:zz addr3 = 00:a0:57:aa:bb:cc SC = 0 addr4 = 00:00:00:00:00:00 ###[ 802.11 Authentication ]### algo = open seqnum = 1 status = success Scanning max 5 seconds for Authentication from BSSID 00:a0:57:aa:bb:cc . Sent 1 packets. Detected Authentication from Source 00:a0:57:aa:bb:cc STA is authenticated to the AP! ###[ 802.11 ]### subtype = 0 type = Management proto = 0 FCfield = ID = 0 addr1 = 00:a0:57:aa:bb:cc addr2 = 60:e3:27:xx:yy:zz addr3 = 00:a0:57:aa:bb:cc SC = 0 addr4 = 00:00:00:00:00:00 ###[ 802.11 Association Request ]### cap = ESS+privacy listen_interval= 10 ###[ 802.11 Information Element ]### ID = SSID len = None info = 'SSID-NAME' ###[ 802.11 Rates Information Element ]### ID = 1 len = 8 supported_rate1= 12 supported_rate2= 18 supported_rate3= 24 supported_rate4= 36 supported_rate5= 48 supported_rate6= 72 supported_rate7= 96 supported_rate8= 108 Scanning max 5 seconds for Association Response from BSSID 00:a0:57:aa:bb:cc . Sent 1 packets. Detected Association Response from Source 00:a0:57:aa:bb:cc STA is connected to the AP!
From this point on, you can send any frame, even frames that usually require a valid connection, to interact with the AP. May this little tool be useful to you and if you got any questions, feel free to ask them within the comment section. Thanks for reading.