Sunday 28 October 2012

Using Scapy to test PPPoE AC-Cookie validation

AC-Cookies are a mechanism designed to help mitigate certain denial of service attacks against PPPoE access concentrators. To understand the function it is important to first understand the normal flow of the PPPoE discovery process, which is as follows:

  1. The PPPoE client sends a broadcast PADI (initiate) message
  2. Any PPPoE access concentrators willing to service the client respond with a unicast PADO (offer) message
  3. The client selects which access concentrator to use and unicasts a PADR (request) message asking for a session to be established
  4. The access concentrator unicasts a PADS (session) message to the client to indicate that the session has been established
If an attacker is able to spoof PADI and PADR messages from a number of MAC addresses, a large amount of PPPoE state can be created in the access concentrator. An AC-Cookie is an unpredictable (to the client) value which is attached to the PADO message which must be echoed back in the PADR in order for it to be accepted by the access concentrator. Since the AC-Cookie cannot be predicted by the client, if the correct value is echoed back to the concentrator then it is extremely unlikely to have been spoofed and it is therefore safe for the access concentrator to allocate resources to the session.

This is all well and good but what if you need to prove the mechanism works or to show what error messages that are generated on the receipt of invalid AC-Cookies? As usual with my blog posts I have had to do this so I thought I would share the code. It's not going to win any awards but it works, all you need is scapy (I use 2.0.1, later should be fine).

Usage is pretty straightforward; simply run scapy, instantiate an object of type PPPoESession, override options as appropriate and then instruct it to "run()".

For example, to verify that a valid PPPoE session will come up:

root@client-pc:~/Projects/PPPoED# scapy
Welcome to Scapy (2.0.1)
>>> execfile("PPPoED.py")

>>> p=PPPoESession()
>>> p.outif="eth0"
>>> p.run()
[ debugging messages removed ]

Received PADS
>>>

Once the PADS is received, the process is complete and control returns to the console.

To verify that the access concentrator checks the value of AC-Cookies returned in PADR messages, we can set the script to reply using garbage values for the AC-Cookie tag as follows:

root@client-pc:~/Projects/PPPoED# scapy
Welcome to Scapy (2.0.1)
>>> execfile("PPPoED.py")
>>> p=PPPoESession()
>>> p.randomcookie=True
>>> p.retries=200
>>> p.run()

This will send a normal PADI and wait for a PADO before sending, up to the configured number of retries, PADR messages with randomised AC-Cookie tag values. When a PADS is received or the number of retries is exceeded, control returns to the console.

References

RFC 2516, Section 9 - http://tools.ietf.org/html/rfc2516

Code

import os
class PPPoESession(Automaton):
  randomcookie = False
  retries = 100
  outif="eth1"
  mac="00:10:20:30:40:50"
  hu="\x7a\x0e\x00\x00"
  ac_cookie=""
  ac_mac="ff:ff:ff:ff:ff:ff"
  our_magic="\x01\x23\x45\x67"
  their_magic="\x00\x00\x00\x00"
  sess_id = 0
# Method to recover an AC-Cookie from the tags
  def getcookie(self, payload):
    loc = 0
    while(loc < len(payload)):
      att_type = payload[loc:loc+2]
      att_len = (256 * ord(payload[loc+2:loc+3])) + ord(payload[loc+3:loc+4])
      print "Got attribute " + str(ord(att_type[:1])).zfill(2) + str(ord(att_type[1:])).zfill(2)  + " of length " + str(att_len) + " value " + payload[loc+4:loc+4+att_len]
      if att_type == "\x01\x04":
        self.ac_cookie = payload[loc+4:loc+4+att_len]
        print "Got AC-Cookie of " + self.ac_cookie
        break
      loc = loc + att_len + 4
# Define possible states
  @ATMT.state(initial=1)
  def START(self):
    pass
  @ATMT.state()
  def WAIT_PADO(self):
    pass
  @ATMT.state()
  def GOT_PADO(self):
    pass
  @ATMT.state()
  def WAIT_PADS(self):
    pass
  @ATMT.state(error=1)
  def ERROR(self):
    pass
  @ATMT.state(final=1)
  def END(self):
    pass
# Define transitions
# Transitions from START
  @ATMT.condition(START)
  def send_padi(self):
    print "Send PADI"
    sendp(Ether(src=self.mac, dst="ff:ff:ff:ff:ff:ff")/PPPoED()/Raw(load='\x01\x01\x00\x00'+'\x01\x03\x00\x04'+self.hu),iface=self.outif)
    raise self.WAIT_PADO()
# Transitions from WAIT_PADO
  @ATMT.timeout(WAIT_PADO, 3)
  def timeout_pado(self):
    print "Timed out waiting for PADO"
    self.retries -= 1
    if(self.retries < 0):
      print "Too many retries, aborting."
      raise self.ERROR()
    raise self.START()
  @ATMT.receive_condition(WAIT_PADO)
  def receive_pado(self,pkt):
    if (PPPoED in pkt) and (pkt[PPPoED].code==7):
      print "Received PADO"
      self.ac_mac=pkt[Ether].src
      self.getcookie(pkt[Raw].load)
      raise self.GOT_PADO()
#
# Transitions from GOT_PADO
  @ATMT.condition(GOT_PADO)
  def send_padr(self):
    print "Send PADR"
    if(self.randomcookie):
      print "Random cookie being used"
      self.ac_cookie=os.urandom(16)
    sendp(Ether(src=self.mac, dst=self.ac_mac)/PPPoED(code=25)/Raw(load='\x01\x01\x00\x00'+'\x01\x03\x00\x04'+self.hu+'\x01\x04\x00'+chr(len(self.ac_cookie))+self.ac_cookie),iface=self.outif)
    raise self.WAIT_PADS()
#
# Transitions from WAIT_PADS
  @ATMT.timeout(WAIT_PADS, 1)
  def timeout_pads(self):
    print "Timed out waiting for PADS"
    self.retries -= 1
    if(self.retries < 0):
      print "Too many retries, aborting."
      raise self.ERROR()
    raise self.GOT_PADO()
  @ATMT.receive_condition(WAIT_PADS)
  def receive_pads(self,pkt):
    if (PPPoED in pkt) and (pkt[PPPoED].code==101):
      print "Received PADS"
      self.sess_id = pkt[PPPoED].sessionid
      raise self.END()
  @ATMT.receive_condition(WAIT_PADS)
  def receive_padt(self,pkt):
    if (PPPoED in pkt) and (pkt[PPPoED].code==167):
      print "Received PADT"
      raise self.ERROR()