From 224394bc9ee2ae3f11b3414c833cd61d8776f359 Mon Sep 17 00:00:00 2001 From: gpotter2 <10530980+gpotter2@users.noreply.github.com> Date: Wed, 11 Mar 2026 23:15:11 +0100 Subject: [PATCH 1/4] [MS-NRPC]: authenticators for Kerberos --- scapy/layers/msrpce/msnrpc.py | 79 +++++++++++++++++++---------------- scapy/layers/windows/erref.py | 1 + 2 files changed, 45 insertions(+), 35 deletions(-) diff --git a/scapy/layers/msrpce/msnrpc.py b/scapy/layers/msrpce/msnrpc.py index 1b47623e6ad..f88d67cce6b 100644 --- a/scapy/layers/msrpce/msnrpc.py +++ b/scapy/layers/msrpce/msnrpc.py @@ -620,27 +620,33 @@ def create_authenticator(self): """ Create a NETLOGON_AUTHENTICATOR """ - # [MS-NRPC] sect 3.1.4.5 - ts = int(time.time()) - self.ClientStoredCredential = _credentialAddition( - self.ClientStoredCredential, ts - ) - return PNETLOGON_AUTHENTICATOR( - Credential=PNETLOGON_CREDENTIAL( - data=( - ComputeNetlogonCredentialAES( - self.ClientStoredCredential, - self.SessionKey, - ) - if self.supportAES - else ComputeNetlogonCredentialDES( - self.ClientStoredCredential, - self.SessionKey, - ) + if isinstance(self.ssp, NetlogonSSP): + # [MS-NRPC] sect 3.1.4.5 + ts = int(time.time()) + self.ClientStoredCredential = _credentialAddition( + self.ClientStoredCredential, ts + ) + return PNETLOGON_AUTHENTICATOR( + Credential=PNETLOGON_CREDENTIAL( + data=( + ComputeNetlogonCredentialAES( + self.ClientStoredCredential, + self.SessionKey, + ) + if self.supportAES + else ComputeNetlogonCredentialDES( + self.ClientStoredCredential, + self.SessionKey, + ) + ), ), - ), - Timestamp=ts, - ) + Timestamp=ts, + ) + elif isinstance(self.ssp, KerberosSSP): + # Kerberos. + return PNETLOGON_AUTHENTICATOR() + else: + raise ValueError("Invalid ssp case !") def validate_authenticator(self, auth): """ @@ -648,20 +654,26 @@ def validate_authenticator(self, auth): :param auth: the NETLOGON_AUTHENTICATOR object """ - # [MS-NRPC] sect 3.1.4.5 - self.ClientStoredCredential = _credentialAddition( - self.ClientStoredCredential, 1 - ) - if self.supportAES: - tempcred = ComputeNetlogonCredentialAES( - self.ClientStoredCredential, self.SessionKey + if isinstance(self.ssp, NetlogonSSP): + # [MS-NRPC] sect 3.1.4.5 + self.ClientStoredCredential = _credentialAddition( + self.ClientStoredCredential, 1 ) + if self.supportAES: + tempcred = ComputeNetlogonCredentialAES( + self.ClientStoredCredential, self.SessionKey + ) + else: + tempcred = ComputeNetlogonCredentialDES( + self.ClientStoredCredential, self.SessionKey + ) + if tempcred != auth.Credential.data: + raise ValueError("Server netlogon authenticator is wrong !") + elif isinstance(self.ssp, KerberosSSP): + # Kerberos. Ignore + pass else: - tempcred = ComputeNetlogonCredentialDES( - self.ClientStoredCredential, self.SessionKey - ) - if tempcred != auth.Credential.data: - raise ValueError("Server netlogon authenticator is wrong !") + raise ValueError("Invalid ssp case !") def establish_secure_channel( self, @@ -879,6 +891,3 @@ def establish_secure_channel( # An error occurred netr_server_authkerb_response.show() raise ValueError("NetrServerAuthenticateKerberos failed !") - - # The NRPC session key is in this case the kerberos one - self.SessionKey = self.sspcontext.SessionKey diff --git a/scapy/layers/windows/erref.py b/scapy/layers/windows/erref.py index ba17053ce12..b18cacf769e 100644 --- a/scapy/layers/windows/erref.py +++ b/scapy/layers/windows/erref.py @@ -17,6 +17,7 @@ 0x00000011: "ERROR_NOT_SAME_DEVICE", 0x00000013: "ERROR_WRITE_PROTECT", 0x00000057: "ERROR_INVALID_PARAMETER", + 0xC000006A: "STATUS_WRONG_PASSWORD", 0x0000007A: "ERROR_INSUFFICIENT_BUFFER", 0x0000007B: "ERROR_INVALID_NAME", 0x000000A1: "ERROR_BAD_PATHNAME", From 30b0246e95479d785c0f1625e978d57fa9031074 Mon Sep 17 00:00:00 2001 From: gpotter2 <10530980+gpotter2@users.noreply.github.com> Date: Fri, 13 Mar 2026 19:36:27 +0100 Subject: [PATCH 2/4] Kerberos: handle the LDAP case. --- scapy/layers/gssapi.py | 13 ++++++++++++ scapy/layers/kerberos.py | 40 ++++++++++++++++++++++------------- scapy/layers/ldap.py | 33 +++++++++++++++++++++++------ scapy/layers/msrpce/msdcom.py | 6 ++++++ scapy/layers/msrpce/msnrpc.py | 7 +++--- scapy/modules/ldaphero.py | 11 +++------- 6 files changed, 77 insertions(+), 33 deletions(-) diff --git a/scapy/layers/gssapi.py b/scapy/layers/gssapi.py index d14f2360f43..6f4d5e91343 100644 --- a/scapy/layers/gssapi.py +++ b/scapy/layers/gssapi.py @@ -403,6 +403,19 @@ class GSS_S_FLAGS(IntFlag): GSS_S_ALLOW_MISSING_BINDINGS = 0x10000000 +class GSS_QOP_REQ_FLAGS(IntFlag): + """ + Used for qop_flags + """ + + # Windows' API requires requesters to add an extra buffer of type + # 'SECBUFFER_PADDING' to receive the padding. The GSS_WrapEx API + # does not provide such a mechanism and always uses it. However + # some implementations like LDAP actually require NO padding, which + # therefore can't be achieved with GSS_WrapEx. + GSS_S_NO_SECBUFFER_PADDING = 0x10000000 + + class SSP: """ The general SSP class diff --git a/scapy/layers/kerberos.py b/scapy/layers/kerberos.py index a187c39fa3b..0540d17e3d4 100644 --- a/scapy/layers/kerberos.py +++ b/scapy/layers/kerberos.py @@ -128,21 +128,22 @@ from scapy.volatile import GeneralizedTime, RandNum, RandBin from scapy.layers.gssapi import ( - GSSAPI_BLOB, + _GSSAPI_OIDS, + _GSSAPI_SIGNATURE_OIDS, GSS_C_FLAGS, GSS_C_NO_CHANNEL_BINDINGS, + GSS_QOP_REQ_FLAGS, GSS_S_BAD_BINDINGS, GSS_S_BAD_MECH, GSS_S_COMPLETE, GSS_S_CONTINUE_NEEDED, - GSS_S_DEFECTIVE_TOKEN, GSS_S_DEFECTIVE_CREDENTIAL, + GSS_S_DEFECTIVE_TOKEN, GSS_S_FAILURE, GSS_S_FLAGS, + GSSAPI_BLOB, GssChannelBindings, SSP, - _GSSAPI_OIDS, - _GSSAPI_SIGNATURE_OIDS, ) from scapy.layers.inet import TCP, UDP from scapy.layers.smb import _NV_VERSION @@ -202,7 +203,6 @@ Union, ) - # kerberos APPLICATION @@ -4618,6 +4618,7 @@ class CONTEXT(SSP.CONTEXT): "ServerHostname", "U2U", "KrbSessionKey", # raw Key object + "ST", # the service ticket "STSessionKey", # raw ST Key object (for DCE_STYLE) "SeqNum", # for AP "SendSeqNum", # for MIC @@ -4640,6 +4641,7 @@ def __init__(self, IsAcceptor, req_flags=None): self.SendSeqNum = 0 self.RecvSeqNum = 0 self.KrbSessionKey = None + self.ST = None self.STSessionKey = None self.IsAcceptor = IsAcceptor self.UPN = None @@ -4752,7 +4754,7 @@ def GSS_VerifyMICEx(self, Context, msgs, signature): if sig != signature.root.SGN_CKSUM: raise ValueError("ERROR: Checksums don't match") - def GSS_WrapEx(self, Context, msgs, qop_req=0): + def GSS_WrapEx(self, Context, msgs, qop_req: GSS_QOP_REQ_FLAGS = 0): """ [MS-KILE] sect 3.4.5.4 @@ -4786,9 +4788,16 @@ def GSS_WrapEx(self, Context, msgs, qop_req=0): Data = b"".join(x.data for x in msgs if x.conf_req_flag) DataLen = len(Data) # 2. Add filler - # [MS-KILE] sect 3.4.5.4.1 - "For AES-SHA1 ciphers, the EC must not - # be zero" - tok.root.EC = ((-DataLen) % Context.KrbSessionKey.ep.blocksize) or 16 + if qop_req & GSS_QOP_REQ_FLAGS.GSS_S_NO_SECBUFFER_PADDING: + # Special case for compatibility with Windows API. See + # GSS_QOP_REQ_FLAGS. + tok.root.EC = 0 + else: + # [MS-KILE] sect 3.4.5.4.1 - "For AES-SHA1 ciphers, the EC must not + # be zero" + tok.root.EC = ( + (-DataLen) % Context.KrbSessionKey.ep.blocksize + ) or 16 Filler = b"\x00" * tok.root.EC Data += Filler # 3. Add first 16 octets of the Wrap token "header" @@ -5142,7 +5151,7 @@ def GSS_Init_sec_context( # Store TGT, self.TGT = res.asrep.ticket self.TGTSessionKey = res.sessionkey - else: + elif self.TGTSessionKey is None: # We have a TGT and were passed its key self.TGTSessionKey = self.KEY @@ -5166,11 +5175,12 @@ def GSS_Init_sec_context( return Context, None, GSS_S_FAILURE # Store the service ticket and associated key - self.ST, Context.STSessionKey = res.tgsrep.ticket, res.sessionkey + Context.ST, Context.STSessionKey = res.tgsrep.ticket, res.sessionkey elif not self.KEY: raise ValueError("Must provide KEY with ST") else: # We were passed a ST and its key + Context.ST = self.ST Context.STSessionKey = self.KEY if Context.flags & GSS_C_FLAGS.GSS_C_DELEG_FLAG: @@ -5179,8 +5189,8 @@ def GSS_Init_sec_context( ) # Save ServerHostname - if len(self.ST.sname.nameString) == 2: - Context.ServerHostname = self.ST.sname.nameString[1].val.decode() + if len(Context.ST.sname.nameString) == 2: + Context.ServerHostname = Context.ST.sname.nameString[1].val.decode() # Build the KRB-AP apOptions = ASN1_BIT_STRING("000") @@ -5191,7 +5201,7 @@ def GSS_Init_sec_context( Context.U2U = True ap_req = KRB_AP_REQ( apOptions=apOptions, - ticket=self.ST, + ticket=Context.ST, authenticator=EncryptedData(), ) @@ -5393,7 +5403,7 @@ def GSS_Accept_sec_context( key=self.KEY, password=self.PASSWORD, ) - self.TGT, self.KEY = res.asrep.ticket, res.sessionkey + self.TGT, self.TGTSessionKey = res.asrep.ticket, res.sessionkey # Server receives AP-req, sends AP-rep if isinstance(input_token, KRB_AP_REQ): diff --git a/scapy/layers/ldap.py b/scapy/layers/ldap.py index 923c54d9281..047ece69199 100644 --- a/scapy/layers/ldap.py +++ b/scapy/layers/ldap.py @@ -83,16 +83,17 @@ from scapy.layers.inet import IP, TCP, UDP from scapy.layers.inet6 import IPv6 from scapy.layers.gssapi import ( + _GSSAPI_Field, ChannelBindingType, - GSSAPI_BLOB, - GSSAPI_BLOB_SIGNATURE, GSS_C_FLAGS, GSS_C_NO_CHANNEL_BINDINGS, + GSS_QOP_REQ_FLAGS, GSS_S_COMPLETE, GSS_S_CONTINUE_NEEDED, + GSSAPI_BLOB_SIGNATURE, + GSSAPI_BLOB, GssChannelBindings, SSP, - _GSSAPI_Field, ) from scapy.layers.netbios import NBTDatagram from scapy.layers.smb import ( @@ -1838,16 +1839,21 @@ def connect( """ self.ssl = use_ssl self.sslcontext = sslcontext + self.timeout = timeout + self.host = host if port is None: if self.ssl: port = 636 else: port = 389 + + # Create and configure socket sock = socket.socket() - self.timeout = timeout - self.host = host + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) sock.settimeout(timeout) + + # Connect if self.verb: print( "\u2503 Connecting to %s on port %s%s..." @@ -1864,6 +1870,7 @@ def connect( "\u2514 Connected from %s" % repr(sock.getsockname()) ) ) + # For SSL, build and apply SSLContext if self.ssl: if self.sslcontext is None: @@ -1876,14 +1883,16 @@ def connect( else: context = self.sslcontext sock = context.wrap_socket(sock, server_hostname=sni or host) + # Wrap the socket in a Scapy socket if self.ssl: - self.sock = SSLStreamSocket(sock, LDAP) # Compute the channel binding token (CBT) self.chan_bindings = GssChannelBindings.fromssl( ChannelBindingType.TLS_SERVER_END_POINT, sslsock=sock, ) + + self.sock = SSLStreamSocket(sock, LDAP) else: self.sock = StreamSocket(sock, LDAP) @@ -1891,12 +1900,14 @@ def sr1(self, protocolOp, controls: List[LDAP_Control] = None, **kwargs): self.messageID += 1 if self.verb: print(conf.color_theme.opening(">> %s" % protocolOp.__class__.__name__)) + # Build packet pkt = LDAP( messageID=self.messageID, protocolOp=protocolOp, Controls=controls, ) + # If signing / encryption is used, apply if self.sasl_wrap: pkt = LDAP_SASL_Buffer( @@ -1904,8 +1915,13 @@ def sr1(self, protocolOp, controls: List[LDAP_Control] = None, **kwargs): self.sspcontext, bytes(pkt), conf_req_flag=self.encrypt, + # LDAP on Windows doesn't use SECBUFFER_PADDING, which + # isn't supported by GSS_WrapEx. We add our own flag to + # tell it. + qop_req=GSS_QOP_REQ_FLAGS.GSS_S_NO_SECBUFFER_PADDING, ) ) + # Send / Receive resp = self.sock.sr1( pkt, @@ -1918,6 +1934,7 @@ def sr1(self, protocolOp, controls: List[LDAP_Control] = None, **kwargs): resp.show() print(conf.color_theme.fail("! Got unsolicited notification.")) return resp + # If signing / encryption is used, unpack if self.sasl_wrap: if resp.Buffer: @@ -1929,6 +1946,8 @@ def sr1(self, protocolOp, controls: List[LDAP_Control] = None, **kwargs): ) else: resp = None + + # Verbose display if self.verb: if not resp: print(conf.color_theme.fail("! Bad response.")) @@ -2287,7 +2306,7 @@ def search( controlType="1.2.840.113556.1.4.319", criticality=True, controlValue=LDAP_realSearchControlValue( - size=200, # paging to 200 per 200 + size=100, # paging to 100 per 100 cookie=cookie, ), ) diff --git a/scapy/layers/msrpce/msdcom.py b/scapy/layers/msrpce/msdcom.py index d60bc7ed834..c08de3cf8f7 100644 --- a/scapy/layers/msrpce/msdcom.py +++ b/scapy/layers/msrpce/msdcom.py @@ -16,6 +16,7 @@ import uuid from scapy.config import conf +from scapy.error import log_runtime from scapy.packet import Packet, bind_layers from scapy.fields import ( ConditionalField, @@ -1245,6 +1246,11 @@ def _ChoseRPCBinding(self, bindings: List[STRINGBINDING]): socket.gethostbyname(host) except Exception: # Resolution failed. Skip. + log_runtime.warning( + "Resolution of '%s' failed, check your DNS and default " + "DNS prefix. Kerberos authentication will likely not work." + % host + ) continue # Success diff --git a/scapy/layers/msrpce/msnrpc.py b/scapy/layers/msrpce/msnrpc.py index f88d67cce6b..187fa286d1d 100644 --- a/scapy/layers/msrpce/msnrpc.py +++ b/scapy/layers/msrpce/msnrpc.py @@ -643,7 +643,7 @@ def create_authenticator(self): Timestamp=ts, ) elif isinstance(self.ssp, KerberosSSP): - # Kerberos. + # Kerberos. This is off spec :( return PNETLOGON_AUTHENTICATOR() else: raise ValueError("Invalid ssp case !") @@ -670,8 +670,9 @@ def validate_authenticator(self, auth): if tempcred != auth.Credential.data: raise ValueError("Server netlogon authenticator is wrong !") elif isinstance(self.ssp, KerberosSSP): - # Kerberos. Ignore - pass + # Kerberos. This is off spec :( + if bytes(auth) != b"\x00" * 12: + raise ValueError("Server netlogon authenticator is wrong !") else: raise ValueError("Invalid ssp case !") diff --git a/scapy/modules/ldaphero.py b/scapy/modules/ldaphero.py index 8842cf3b593..1a43e55ec66 100644 --- a/scapy/modules/ldaphero.py +++ b/scapy/modules/ldaphero.py @@ -149,7 +149,7 @@ class LDAPHero: def __init__( self, ssp: SSP = None, - mech: LDAP_BIND_MECHS = None, + mech: LDAP_BIND_MECHS = LDAP_BIND_MECHS.SASL_GSS_SPNEGO, sign: bool = True, encrypt: bool = False, host: str = None, @@ -165,15 +165,9 @@ def __init__( use_krb5ccname: bool = False, ): self.client = LDAP_Client() - if ( - ssp is None - and mech in [None, LDAP_BIND_MECHS.SASL_GSS_SPNEGO] - and UPN - and host - ): + if ssp is None and mech == LDAP_BIND_MECHS.SASL_GSS_SPNEGO and UPN and host: # We allow the SSP to be provided through arguments. # In that case, use SPNEGO - mech = LDAP_BIND_MECHS.SASL_GSS_SPNEGO ssp = SPNEGOSSP.from_cli_arguments( UPN=UPN, target=host, @@ -470,6 +464,7 @@ def bindtypechange(*args, **kwargs): elif bindtype == LDAP_BIND_MECHS.SICILY: domentry.config(state=tk.DISABLED) signbtn.config(state=tk.DISABLED) + signv.set(False) encrbtn.config(state=tk.NORMAL) else: domentry.config(state=tk.NORMAL, textvariable=domainv) From 62f59c4dc763335614562b33040c950f1680df03 Mon Sep 17 00:00:00 2001 From: gpotter2 <10530980+gpotter2@users.noreply.github.com> Date: Fri, 13 Mar 2026 23:08:14 +0100 Subject: [PATCH 3/4] Improve handling of SMB_Server RPCs --- scapy/layers/dcerpc.py | 2 +- scapy/layers/smbserver.py | 146 +++++++++++++++++++++++++++----------- 2 files changed, 107 insertions(+), 41 deletions(-) diff --git a/scapy/layers/dcerpc.py b/scapy/layers/dcerpc.py index 20a2d1d5376..a79b90b9ce8 100644 --- a/scapy/layers/dcerpc.py +++ b/scapy/layers/dcerpc.py @@ -1461,7 +1461,7 @@ def getfield_and_val(self, attr): pass raise - def valueof(self, request): + def valueof(self, request: str): """ Util to get the value of a NDRField, ignoring arrays, pointers, etc. """ diff --git a/scapy/layers/smbserver.py b/scapy/layers/smbserver.py index 99c6ba7d89f..784f791b5ec 100644 --- a/scapy/layers/smbserver.py +++ b/scapy/layers/smbserver.py @@ -33,6 +33,7 @@ from scapy.layers.dcerpc import ( DCERPC_Transport, NDRUnion, + NDRPointer, ) from scapy.layers.gssapi import ( GSS_S_COMPLETE, @@ -1658,46 +1659,74 @@ def netr_share_enum(self, req): NetrShareEnum [MS-SRVS] "retrieves information about each shared resource on a server." """ - nbEntries = len(self.shares) - return NetrShareEnum_Response( + level = req.InfoStruct.Level + + # Create response + resp = NetrShareEnum_Response( InfoStruct=LPSHARE_ENUM_STRUCT( - Level=1, + Level=level, ShareInfo=NDRUnion( - tag=1, - value=SHARE_INFO_1_CONTAINER( - Buffer=[ - # Add shares - LPSHARE_INFO_1( - shi1_netname=x.name, - shi1_type=x.type, - shi1_remark=x.remark, - ) - for x in self.shares - ], - EntriesRead=nbEntries, - ), + tag=level, + value=None, ), ), - TotalEntries=nbEntries, ndr64=self.ndr64, ) + if level == 1: + nbEntries = len(self.shares) + resp.InfoStruct.ShareInfo.value = NDRPointer( + referent_id=0x20000, + value=SHARE_INFO_1_CONTAINER( + Buffer=[ + # Add shares + LPSHARE_INFO_1( + shi1_netname=x.name, + shi1_type=x.type, + shi1_remark=x.remark, + ) + for x in self.shares + ], + EntriesRead=nbEntries, + ), + ) + resp.TotalEntries = nbEntries + else: + # We only support level 1 :( + resp.status = 0x0000007C # ERROR_INVALID_LEVEL + + return resp + @DCERPC_Server.answer(NetrWkstaGetInfo_Request) def netr_wksta_getinfo(self, req): """ NetrWkstaGetInfo [MS-SRVS] "returns information about the configuration of a workstation." """ - return NetrWkstaGetInfo_Response( + level = req.Level + + # Create response + resp = NetrWkstaGetInfo_Response( WkstaInfo=NDRUnion( - tag=100, + tag=level, + value=None, + ), + ndr64=self.ndr64, + ) + + if level == 100: + resp.WkstaInfo.value = NDRPointer( + referent_id=0x20000, value=LPWKSTA_INFO_100( wki100_platform_id=500, # NT wki100_ver_major=5, ), - ), - ndr64=self.ndr64, - ) + ) + else: + # We only support level 101 :( + resp.status = 0x0000007C # ERROR_INVALID_LEVEL + + return resp @DCERPC_Server.answer(NetrServerGetInfo_Request) def netr_server_getinfo(self, req): @@ -1706,38 +1735,75 @@ def netr_server_getinfo(self, req): "retrieves current configuration information for CIFS and SMB Version 1.0 servers." """ - return NetrServerGetInfo_Response( - ServerInfo=NDRUnion( - tag=101, - value=LPSERVER_INFO_101( - sv101_platform_id=500, # NT - sv101_name=req.ServerName.value.value[0].value, - sv101_version_major=6, - sv101_version_minor=1, - sv101_type=1, # Workstation - ), + level = req.Level + + # Create response + resp = NetrServerGetInfo_Response( + InfoStruct=NDRUnion( + tag=level, + value=None, ), ndr64=self.ndr64, ) + if level == 101: + resp.InfoStruct.value = NDRPointer( + referent_id=0x20000, + value=LPSERVER_INFO_101( + sv101_platform_id=500, # NT + sv101_name="WORKSTATION", + sv101_version_major=10, + sv101_version_minor=0, + sv101_version_type=1, # Workstation + ), + ) + else: + # We only support level 101 :( + resp.status = 0x0000007C # ERROR_INVALID_LEVEL + + return resp + @DCERPC_Server.answer(NetrShareGetInfo_Request) def netr_share_getinfo(self, req): """ NetrShareGetInfo [MS-SRVS] "retrieves information about a particular shared resource on a server." """ - return NetrShareGetInfo_Response( - ShareInfo=NDRUnion( - tag=1, - value=LPSHARE_INFO_1( - shi1_netname=req.NetName.value[0].value, - shi1_type=0, - shi1_remark=b"", - ), + level = req.Level + share_netname = req.payload.payload.valueof("NetName").decode() + + # Create response + resp = NetrShareGetInfo_Response( + InfoStruct=NDRUnion( + tag=level, + value=None, ), ndr64=self.ndr64, ) + # Find the share the client is asking details for + try: + share = next(x for x in self.shares if x.name == share_netname) + except StopIteration: + # Share doesn't exist ! + resp.status = 0x00000906 # NERR_NetNameNotFound + return resp + + if level == 1: + resp.InfoStruct.value = NDRPointer( + referent_id=0x20000, + value=LPSHARE_INFO_1( + shi1_netname=share.name, + shi1_type=share.type, + shi1_remark=share.remark, + ), + ) + else: + # We only support level 1 :( + resp.status = 0x0000007C # ERROR_INVALID_LEVEL + + return resp + # Util From 5bb783f080a83694981a00d48e71d1d67aa12b7e Mon Sep 17 00:00:00 2001 From: gpotter2 <10530980+gpotter2@users.noreply.github.com> Date: Sat, 14 Mar 2026 18:44:28 +0100 Subject: [PATCH 4/4] MS-NRPC: add unit tests for Authenticators --- test/scapy/layers/msrpce/msnrpc.uts | 36 +++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/test/scapy/layers/msrpce/msnrpc.uts b/test/scapy/layers/msrpce/msnrpc.uts index f678d7f6274..c4ca0829d4e 100644 --- a/test/scapy/layers/msrpce/msnrpc.uts +++ b/test/scapy/layers/msrpce/msnrpc.uts @@ -503,3 +503,39 @@ try: assert False, "No error was reported, but there should have been one" except ValueError: pass + += [NetlogonClient] - Build and validate authenticator - Netlogon SSP + +from unittest import mock +from scapy.layers.msrpce.msnrpc import NetlogonClient, NetlogonSSP + +client = NetlogonClient() +client.SessionKey = b'\xec\xee\xda\xb70\xdeQ\x98\xa4\xceDErt\xcem' +client.ssp = NetlogonSSP(client.SessionKey, "WKS01", "DOMAIN") +client.ClientStoredCredential = b'\xf8\x890D\x1b_\xf2x' + +# Build +with mock.patch('scapy.layers.msrpce.msnrpc.time.time', side_effect=lambda: 1773509346): + authenticator = client.create_authenticator() + assert authenticator.Timestamp == 1773509346 + assert bytes(authenticator) == b'a\x18\xa3\xebu`3\x84\xe2\x9a\xb5i' + +# Verify +authenticator = PNETLOGON_AUTHENTICATOR(b'`6n\xd0\x80\x91"\x06\x00\x00\x00\x00') +client.validate_authenticator(authenticator) + += [NetlogonClient] - Build and validate authenticator - Kerberos SSP + +from scapy.layers.msrpce.msnrpc import NetlogonClient, PNETLOGON_AUTHENTICATOR +from scapy.layers.kerberos import KerberosSSP + +client = NetlogonClient() +client.ssp = KerberosSSP(UPN="WKS01@DOMAIN", PASSWORD="Password") + +# Build +authenticator = client.create_authenticator() +assert bytes(authenticator) == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + +# Verify +authenticator = PNETLOGON_AUTHENTICATOR(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') +client.validate_authenticator(authenticator)