diff --git a/impacket/dcerpc/v5/dcomrt.py b/impacket/dcerpc/v5/dcomrt.py index 0e67d1f653..3ebe9e0447 100644 --- a/impacket/dcerpc/v5/dcomrt.py +++ b/impacket/dcerpc/v5/dcomrt.py @@ -1249,6 +1249,17 @@ def get_cinstance(self): def set_cinstance(self, cinstance): self.__cinstance = cinstance + + def is_target_loopback(self): + """ + Detect all loopback IP addresses. + This is assuming 127.0.0.0/8 is IPv4 loopback. + Also accounting for IPv6 loopback addresses. + This should only be used to filter user input and automatically redirect any DCOM connections + from local machines to correct string-bindings on the local endpoint mapper. + """ + return self.get_target().startswith('127.') \ + or self.get_target() == '::1' or self.get_target() == '0:0:0:0:0:0:0:1' # IPv6 loopback addresses. def is_fqdn(self): # I will assume the following @@ -1257,7 +1268,7 @@ def is_fqdn(self): # an FQDN with ':' # Is it isn't both, then it is a FQDN try: - socket.inet_aton(self.__target) + socket.inet_aton(self.get_target()) except: # Not an IPv4 try: @@ -1268,9 +1279,8 @@ def is_fqdn(self): return False def connect(self, iid = None): - if (self.__target in INTERFACE.CONNECTIONS) is True: - if current_thread().name in INTERFACE.CONNECTIONS[self.__target] and \ - (self.__oxid in INTERFACE.CONNECTIONS[self.__target][current_thread().name]) is True: + if self.__target in INTERFACE.CONNECTIONS: + if current_thread().name in INTERFACE.CONNECTIONS[self.__target] and self.__oxid in INTERFACE.CONNECTIONS[self.__target][current_thread().name]: dce = INTERFACE.CONNECTIONS[self.__target][current_thread().name][self.__oxid]['dce'] currentBinding = INTERFACE.CONNECTIONS[self.__target][current_thread().name][self.__oxid]['currentBinding'] if currentBinding == iid: @@ -1281,6 +1291,7 @@ def connect(self, iid = None): INTERFACE.CONNECTIONS[self.__target][current_thread().name][self.__oxid]['dce'] = newDce INTERFACE.CONNECTIONS[self.__target][current_thread().name][self.__oxid]['currentBinding'] = iid else: + # No existing connection, create a new one. stringBindings = self.get_cinstance().get_string_bindings() # No OXID present, we should create a new connection and store it stringBinding = None @@ -1291,6 +1302,8 @@ def connect(self, iid = None): # 1) it's an IPv4 address # 2) it's an IPv6 address # 3) it's a NetBios Name + # 4) it's 127.0.0.1 - not resolved via epmapper. + # if it really is a loopback address - return true on any stringbinding. # we should handle all this cases accordingly # Does this match exactly what get_target() returns? LOG.debug('StringBinding: %s' % strBinding['aNetworkAddr']) @@ -1303,9 +1316,20 @@ def connect(self, iid = None): binding = strBinding['aNetworkAddr'] bindingPort = '' + if self.is_target_loopback(): + # Accept the first matched string-binding. + # This is because we know we want to connect no matter what. + LOG.debug('Detected loopback DCOM connection attempt, using first available StringBinding.') + stringBinding = 'ncacn_ip_tcp:' + strBinding['aNetworkAddr'][:-1] + # TODO: Decide if we also want to change our self.__target accordingly...? + # Notice - it breaks stuff. + # self.__target = binding + break + if binding.upper().find(self.get_target().upper()) >= 0: stringBinding = 'ncacn_ip_tcp:' + strBinding['aNetworkAddr'][:-1] break + # If get_target() is a FQDN, does it match the hostname? elif isTargetFQDN and binding.upper().find(self.get_target().upper().partition('.')[0]) >= 0: # Here we replace the aNetworkAddr with self.get_target()