diff --git a/j1939/j1939_21.py b/j1939/j1939_21.py index 2ce1562..0a54f6a 100644 --- a/j1939/j1939_21.py +++ b/j1939/j1939_21.py @@ -305,6 +305,7 @@ def _process_tp_cm(self, mid, dest_address, data, timestamp): 'deadline': time.time() + self.Timeout.T2, 'src_address' : src_address, 'dest_address' : dest_address, + 'next_seq_num' : 1, } self.__send_tp_cts(dest_address, src_address, self._rcv_buffer[buffer_hash]['num_packages_max_rec'], 1, pgn) @@ -367,6 +368,7 @@ def _process_tp_cm(self, mid, dest_address, data, timestamp): "deadline": time.time() + self.Timeout.T1, 'src_address' : src_address, 'dest_address' : dest_address, + 'next_seq_num' : 1, } self.__job_thread_wakeup() elif control_byte == self.ConnectionMode.ABORT: @@ -390,9 +392,19 @@ def _process_tp_dt(self, mid, dest_address, data, timestamp): # TODO: LOG/TRACE/EXCEPTION? return + if self._rcv_buffer[buffer_hash]['next_seq_num'] != sequence_number: + logger.critical(f"packet error. out-of-sequence pgn: {self._rcv_buffer[buffer_hash]['pgn']} " + f"required: {self._rcv_buffer[buffer_hash]['next_seq_num']} " + f"received: {sequence_number}") + del self._rcv_buffer[buffer_hash] + self.__job_thread_wakeup() + return + # get data self._rcv_buffer[buffer_hash]['data'].extend(data[1:]) + self._rcv_buffer[buffer_hash]['next_seq_num'] = sequence_number + 1 + # message is complete with sending an acknowledge if len(self._rcv_buffer[buffer_hash]['data']) >= self._rcv_buffer[buffer_hash]['message_size']: logger.info("finished RCV of PGN {} with size {}".format(self._rcv_buffer[buffer_hash]['pgn'], self._rcv_buffer[buffer_hash]['message_size']))