diff --git a/EVSE.py b/EVSE.py index b782e24..dea12a2 100644 --- a/EVSE.py +++ b/EVSE.py @@ -30,6 +30,8 @@ class EVSE: def __init__(self, args): self.mode = RunMode(args.mode[0]) if args.mode else RunMode.FULL + self.skip_slac = True if args.skip_slac else False + self.disable_i2c = True if args.disable_i2c else False self.iface = args.interface[0] if args.interface else "eth1" self.sourceMAC = args.source_mac[0] if args.source_mac else "00:1e:c0:f2:6c:a0" self.sourceIP = args.source_ip[0] if args.source_ip else "fe80::21e:c0ff:fef2:6ca0" @@ -59,10 +61,12 @@ def __init__(self, args): self.exi = EXIProcessor(self.protocol) self.slac = _SLACHandler(self) + self.sdp = _SDPHandler(self) self.tcp = _TCPHandler(self) # I2C bus for relays - self.bus = SMBus(1) + if not self.disable_i2c: + self.bus = SMBus(1) # Constants for i2c controlled relays self.I2C_ADDR = 0x20 @@ -74,10 +78,13 @@ def __init__(self, args): # Start the emulator def start(self): # Initialize the I2C bus for wwrite - self.bus.write_byte_data(self.I2C_ADDR, 0x00, 0x00) + if not self.disable_i2c: + self.bus.write_byte_data(self.I2C_ADDR, 0x00, 0x00) self.toggleProximity() - self.doSLAC() + if not self.skip_slac: + self.doSLAC() + self.doSDP() self.doTCP() # If NMAP is not done, restart connection if not self.tcp.finishedNMAP: @@ -86,6 +93,9 @@ def start(self): # Close the circuit for the proximity pins def closeProximity(self): + if self.disable_i2c: + return + if self.modified_cordset: print("INFO (EVSE): Closing CP/PP relay connections") self.bus.write_byte_data(self.I2C_ADDR, self.CONTROL_REG, self.EVSE_PP | self.EVSE_CP) @@ -95,6 +105,9 @@ def closeProximity(self): # Close the circuit for the proximity pins def openProximity(self): + if self.disable_i2c: + return + print("INFO (EVSE): Opening CP/PP relay connections") self.bus.write_byte_data(self.I2C_ADDR, self.CONTROL_REG, self.ALL_OFF) @@ -109,6 +122,10 @@ def doTCP(self): self.tcp.start() print("INFO (EVSE): Done TCP") + def doSDP(self): + self.sdp.start() + print("INFO (EVSE): Done SDP") + # Starts SLAC thread that handles layer 2 comms def doSLAC(self): self.slac.start() @@ -155,22 +172,13 @@ def startSniff(self): sniff(iface=self.iface, prn=self.handlePacket, stop_filter=self.stopSniff) def stopSniff(self, pkt): - if pkt.haslayer("SECC_RequestMessage"): - print("INDO (EVSE): Recieved SECC_RequestMessage") - # self.evse.destinationMAC = pkt[Ether].src - # use this to send 3 secc responses incase car doesnt see one - self.destinationIP = pkt[IPv6].src - self.destinationPort = pkt[UDP].sport - Thread(target=self.sendSECCResponse).start() + if pkt.haslayer("CM_SLAC_MATCH_REQ"): + print("INFO (EVSE): Recieved SLAC_MATCH_REQ") + print("INFO (EVSE): Sending SLAC_MATCH_CNF") + sendp(self.buildSlacMatchCnf(), iface=self.iface, verbose=0) self.stop = True return self.stop - def sendSECCResponse(self): - time.sleep(0.2) - for i in range(3): - print("INFO (EVSE): Sending SECC_ResponseMessage") - sendp(self.buildSECCResponse(), iface=self.iface, verbose=0) - def handlePacket(self, pkt): if pkt[Ether].type != 0x88E1 or pkt[Ether].src == self.sourceMAC: return @@ -189,11 +197,6 @@ def handlePacket(self, pkt): print("INFO (EVSE): Sending ATTEN_CHAR_IND") sendp(self.buildAttenCharInd(), iface=self.iface, verbose=0) - if pkt.haslayer("CM_SLAC_MATCH_REQ"): - print("INFO (EVSE): Recieved SLAC_MATCH_REQ") - print("INFO (EVSE): Sending SLAC_MATCH_CNF") - sendp(self.buildSlacMatchCnf(), iface=self.iface, verbose=0) - def buildSlacParmCnf(self): ethLayer = Ether() ethLayer.src = self.sourceMAC @@ -348,13 +351,41 @@ def buildSetKey(self): responsePacket = ethLayer / homePlugAVLayer / homePlugLayer return responsePacket +class _SDPHandler: + def __init__(self, evse: EVSE): + self.evse = evse + + self.destinationIP = None + self.destinationPort = None + + def start(self): + sniff(iface=self.evse.iface, stop_filter=self.stopSniff) + + def stopSniff(self, pkt): + if pkt.haslayer("SECC_RequestMessage"): + print("INDO (EVSE): Recieved SECC_RequestMessage") + # self.evse.destinationMAC = pkt[Ether].src + # use this to send 3 secc responses incase car doesnt see one + self.destinationIP = pkt[IPv6].src + self.destinationPort = pkt[UDP].sport + Thread(target=self.sendSECCResponse).start() + return True + return False + + def sendSECCResponse(self): + time.sleep(0.2) + for i in range(3): + print("INFO (EVSE): Sending SECC_ResponseMessage") + sendp(self.buildSECCResponse(), iface=self.evse.iface, verbose=0) + + def buildSECCResponse(self): e = Ether() - e.src = self.sourceMAC - e.dst = self.destinationMAC + e.src = self.evse.sourceMAC + e.dst = self.evse.destinationMAC ip = IPv6() - ip.src = self.sourceIP + ip.src = self.evse.sourceIP ip.dst = self.destinationIP udp = UDP() @@ -367,8 +398,8 @@ def buildSECCResponse(self): seccRM = SECC_ResponseMessage() seccRM.SecurityProtocol = 16 - seccRM.TargetPort = self.sourcePort - seccRM.TargetAddress = self.sourceIP # eno1 + seccRM.TargetPort = self.evse.sourcePort + seccRM.TargetAddress = self.evse.sourceIP # eno1 responsePacket = e / ip / udp / secc / seccRM return responsePacket @@ -693,6 +724,8 @@ def buildNeighborAdvertisement(self): parser.add_argument("--nmap-mac", nargs=1, help="The MAC address of the target device to NMAP scan (default: EVCC MAC address)") parser.add_argument("--nmap-ip", nargs=1, help="The IP address of the target device to NMAP scan (default: EVCC IP address)") parser.add_argument("--nmap-ports", nargs=1, help="List of ports to scan seperated by commas (ex. 1,2,5-10,19,...) (default: Top 8000 common ports)") + parser.add_argument("--skip-slac", action="store_true", help="Set this option when not using QCA based powerline chip. You will have to handle slac externally. (default: False)") + parser.add_argument("--disable-i2c", action="store_true", help="Set this option when not using the original AcCCS hardware, i.e. no I2C bus is present. (default: False)") parser.add_argument("--modified-cordset", action="store_true", help="Set this option when using a modified cordset during testing of a target vehicle. The AcCCS system will provide a 150 ohm ground on the proximity line to reset the connection. (default: False)") args = parser.parse_args() diff --git a/PEV.py b/PEV.py index fdde028..40ca909 100644 --- a/PEV.py +++ b/PEV.py @@ -32,6 +32,8 @@ class PEV: def __init__(self, args): self.mode = RunMode(args.mode[0]) if args.mode else RunMode.FULL + self.skip_slac = True if args.skip_slac else False + self.disable_i2c = True if args.disable_i2c else False self.iface = args.interface[0] if args.interface else "eth1" self.sourceMAC = args.source_mac[0] if args.source_mac else "00:1e:c0:f2:6c:a1" self.sourceIP = args.source_ip[0] if args.source_ip else "fe80::21e:c0ff:fef2:6ca1" @@ -56,10 +58,12 @@ def __init__(self, args): self.exi = EXIProcessor(self.protocol) self.slac = _SLACHandler(self) + self.sdp = _SDPHandler(self) self.tcp = _TCPHandler(self) # I2C bus for relays - self.bus = SMBus(1) + if not self.disable_i2c: + self.bus = SMBus(1) # Constants for i2c controlled relays self.I2C_ADDR = 0x20 @@ -71,10 +75,13 @@ def __init__(self, args): def start(self): # Initialize the smbus for I2C commands - self.bus.write_byte_data(self.I2C_ADDR, 0x00, 0x00) + if not self.disable_i2c: + self.bus.write_byte_data(self.I2C_ADDR, 0x00, 0x00) self.toggleProximity() - self.doSLAC() + if not self.skip_slac: + self.doSLAC() + self.doSDP() self.doTCP() # If NMAP is not done, restart connection if not self.tcp.finishedNMAP: @@ -85,6 +92,12 @@ def doTCP(self): self.tcp.start() print("INFO (PEV) : Done TCP") + def doSDP(self): + print("INFO (PEV) : Starting SDP") + self.sdp.start() + self.sdp.sniffThread.join() + print("INFO (PEV) : Done SDP") + def doSLAC(self): print("INFO (PEV) : Starting SLAC") self.slac.start() @@ -98,6 +111,10 @@ def openProximity(self): self.setState(PEVState.A) def setState(self, state: PEVState): + if self.disable_i2c: + print(f"INFO (PEV) : State transition to {state}") + return + if state == PEVState.A: print("INFO (PEV) : Going to state A") self.bus.write_byte_data(self.I2C_ADDR, self.CONTROL_REG, self.ALL_OFF) @@ -160,11 +177,18 @@ def startSniff(self): # Stop the thread when the slac match is done def stopSniff(self, pkt): - if pkt.haslayer("SECC_ResponseMessage"): - self.pev.destinationIP = pkt[SECC_ResponseMessage].TargetAddress - self.pev.destinationPort = pkt[SECC_ResponseMessage].TargetPort + if pkt.haslayer("CM_SLAC_MATCH_CNF"): + print("INFO (PEV) : Recieved SLAC_MATCH_CNF") + self.NID = pkt[CM_SLAC_MATCH_CNF].VariableField.NetworkID + self.NMK = pkt[CM_SLAC_MATCH_CNF].VariableField.NMK + print("INFO (PEV) : Sending SET_KEY_REQ") + sendp(self.buildSetKeyReq(), iface=self.iface, verbose=0) + time.sleep(3) # give modem some time to reboot + if self.neighborSolicitationThread.running: self.neighborSolicitationThread.stop() + + self.stop = True return True return False @@ -202,21 +226,6 @@ def handlePacket(self, pkt): self.timeSinceLastPkt = time.time() return - if pkt.haslayer("CM_SLAC_MATCH_CNF"): - print("INFO (PEV) : Recieved SLAC_MATCH_CNF") - self.NID = pkt[CM_SLAC_MATCH_CNF].VariableField.NetworkID - self.NMK = pkt[CM_SLAC_MATCH_CNF].VariableField.NMK - print("INFO (PEV) : Sending SET_KEY_REQ") - sendp(self.buildSetKeyReq(), iface=self.iface, verbose=0) - self.stop = True - Thread(target=self.sendSECCRequest).start() - return - - def sendSECCRequest(self): - time.sleep(3) - print("INFO (PEV) : Sending 3 SECC_RequestMessage") - for i in range(1): - sendp(self.buildSECCRequest(), iface=self.iface, verbose=0) def sendSounds(self): self.numRemainingSounds = self.numSounds @@ -344,31 +353,6 @@ def buildSetKeyReq(self): responsePacket = ethLayer / homePlugAVLayer / homePlugLayer return responsePacket - def buildSECCRequest(self): - ethLayer = Ether() - ethLayer.src = self.sourceMAC - ethLayer.dst = "33:33:00:00:00:01" - - ipLayer = IPv6() - ipLayer.src = self.sourceIP - ipLayer.dst = "ff02::1" - ipLayer.hlim = 255 - - udpLayer = UDP() - udpLayer.sport = self.pev.sourcePort - udpLayer.dport = 15118 - - seccLayer = SECC() - seccLayer.SECCType = 0x9000 - seccLayer.PayloadLen = 2 - - seccRequestLayer = SECC_RequestMessage() - seccRequestLayer.SecurityProtocol = 16 - seccRequestLayer.TransportProtocol = 0 - - responsePacket = ethLayer / ipLayer / udpLayer / seccLayer / seccRequestLayer - return responsePacket - def buildNeighborAdvertisement(self): ethLayer = Ether() ethLayer.src = self.sourceMAC @@ -402,6 +386,60 @@ def sendNeighborSoliciation(self, pkt): # print("INFO (EVSE): Sending Neighor Advertisement") sendp(self.buildNeighborAdvertisement(), iface=self.iface, verbose=0) +class _SDPHandler: + def __init__(self, pev: PEV): + self.pev = pev + + # This method starts the slac process and will stop + def start(self): + self.runID = os.urandom(8) + self.stop = False + # Thread for sniffing packets and handling responses + # self.sniffThread = Thread(target=self.startSniff) + # self.sniffThread.start() + + self.sniffThread = AsyncSniffer(iface=self.pev.iface, stop_filter=self.stopSniff) + self.sniffThread.start() + + self.sendSECCRequest() + + # Stop the thread when the slac match is done + def stopSniff(self, pkt): + if pkt.haslayer("SECC_ResponseMessage"): + self.pev.destinationIP = pkt[SECC_ResponseMessage].TargetAddress + self.pev.destinationPort = pkt[SECC_ResponseMessage].TargetPort + return True + return False + + def sendSECCRequest(self): + print("INFO (PEV) : Sending 3 SECC_RequestMessage") + for i in range(1): + sendp(self.buildSECCRequest(), iface=self.pev.iface, verbose=0) + + def buildSECCRequest(self): + ethLayer = Ether() + ethLayer.src = self.pev.sourceMAC + ethLayer.dst = "33:33:00:00:00:01" + + ipLayer = IPv6() + ipLayer.src = self.pev.sourceIP + ipLayer.dst = "ff02::1" + ipLayer.hlim = 255 + + udpLayer = UDP() + udpLayer.sport = self.pev.sourcePort + udpLayer.dport = 15118 + + seccLayer = SECC() + seccLayer.SECCType = 0x9000 + seccLayer.PayloadLen = 2 + + seccRequestLayer = SECC_RequestMessage() + seccRequestLayer.SecurityProtocol = 16 + seccRequestLayer.TransportProtocol = 0 + + responsePacket = ethLayer / ipLayer / udpLayer / seccLayer / seccRequestLayer + return responsePacket class _TCPHandler: def __init__(self, pev: PEV): @@ -778,6 +816,8 @@ def buildNeighborAdvertisement(self): parser.add_argument("--nmap-mac", nargs=1, help="The MAC address of the target device to NMAP scan (default: SECC MAC address)") parser.add_argument("--nmap-ip", nargs=1, help="The IP address of the target device to NMAP scan (default: SECC IP address)") parser.add_argument("--nmap-ports", nargs=1, help="List of ports to scan seperated by commas (ex. 1,2,5-10,19,...) (default: Top 8000 common ports)") + parser.add_argument("--skip-slac", action="store_true", help="Set this option when not using QCA based powerline chip. You will have to handle slac externally. (default: False)") + parser.add_argument("--disable-i2c", action="store_true", help="Set this option when not using the original AcCCS hardware, i.e. no I2C bus is present. (default: False)") args = parser.parse_args() pev = PEV(args)